YARN-11182. Refactor TestAggregatedLogDeletionService: 2nd phase. Contributed by Szilard Nemeth.
This commit is contained in:
parent
36c4be819f
commit
5d08ffa769
|
@ -59,7 +59,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
|||
private long checkIntervalMsecs;
|
||||
private LogDeletionTask task;
|
||||
|
||||
static class LogDeletionTask extends TimerTask {
|
||||
public static class LogDeletionTask extends TimerTask {
|
||||
private Configuration conf;
|
||||
private long retentionMillis;
|
||||
private String suffix = null;
|
||||
|
@ -101,7 +101,7 @@ public class AggregatedLogDeletionService extends AbstractService {
|
|||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logException("Error reading root log dir this deletion " +
|
||||
logException("Error reading root log dir, this deletion " +
|
||||
"attempt is being aborted", t);
|
||||
}
|
||||
LOG.info("aggregated log deletion finished.");
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
|
||||
|
||||
|
||||
public class LogAggregationTestUtils {
|
||||
public static final String REMOTE_LOG_ROOT = "target/app-logs/";
|
||||
|
||||
public static void enableFileControllers(Configuration conf,
|
||||
List<Class<? extends LogAggregationFileController>> fileControllers,
|
||||
List<String> fileControllerNames) {
|
||||
enableFcs(conf, REMOTE_LOG_ROOT, fileControllers, fileControllerNames);
|
||||
}
|
||||
|
||||
public static void enableFileControllers(Configuration conf,
|
||||
String remoteLogRoot,
|
||||
List<Class<? extends LogAggregationFileController>> fileControllers,
|
||||
List<String> fileControllerNames) {
|
||||
enableFcs(conf, remoteLogRoot, fileControllers, fileControllerNames);
|
||||
}
|
||||
|
||||
|
||||
private static void enableFcs(Configuration conf,
|
||||
String remoteLogRoot,
|
||||
List<Class<? extends LogAggregationFileController>> fileControllers,
|
||||
List<String> fileControllerNames) {
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
|
||||
StringUtils.join(fileControllerNames, ","));
|
||||
for (int i = 0; i < fileControllers.size(); i++) {
|
||||
Class<? extends LogAggregationFileController> fileController = fileControllers.get(i);
|
||||
String controllerName = fileControllerNames.get(i);
|
||||
|
||||
conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, controllerName),
|
||||
fileController, LogAggregationFileController.class);
|
||||
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, controllerName),
|
||||
remoteLogRoot + controllerName + "/");
|
||||
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, controllerName),
|
||||
controllerName);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,96 +18,54 @@
|
|||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.Lists;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcase;
|
||||
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder;
|
||||
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assert;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
|
||||
import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestAggregatedLogDeletionService {
|
||||
|
||||
private static final String T_FILE = "TFile";
|
||||
private static final String I_FILE = "IFile";
|
||||
private static final String USER_ME = "me";
|
||||
private static final String DIR_HOST1 = "host1";
|
||||
private static final String DIR_HOST2 = "host2";
|
||||
|
||||
private static final String ROOT = "mockfs://foo/";
|
||||
private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
|
||||
private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs/";
|
||||
private static final String SUFFIX = "logs";
|
||||
private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
|
||||
private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
|
||||
|
||||
private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
|
||||
ApplicationId appId,
|
||||
String user, String suffix,
|
||||
long modificationTime) {
|
||||
Path path = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, appId, user, suffix);
|
||||
FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
|
||||
return new PathWithFileStatus(path, fileStatus);
|
||||
}
|
||||
private static final List<Class<? extends LogAggregationFileController>>
|
||||
ALL_FILE_CONTROLLERS = Arrays.asList(
|
||||
LogAggregationIndexedFileController.class,
|
||||
LogAggregationTFileController.class);
|
||||
public static final List<String> ALL_FILE_CONTROLLER_NAMES = Arrays.asList(I_FILE, T_FILE);
|
||||
|
||||
private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
|
||||
return new FileStatus(0, true, 0, 0, modificationTime, path);
|
||||
}
|
||||
|
||||
private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
|
||||
long modificationTime) {
|
||||
Path logPath = new Path(baseDir, childDir);
|
||||
FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
|
||||
return new PathWithFileStatus(logPath, fStatus);
|
||||
}
|
||||
|
||||
private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
|
||||
long modificationTime) {
|
||||
Path logPath = new Path(baseDir, childDir);
|
||||
FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
|
||||
return new PathWithFileStatus(logPath, fStatus);
|
||||
}
|
||||
|
||||
private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
|
||||
String user,
|
||||
String suffix,
|
||||
ApplicationId appId,
|
||||
long modificationTime) {
|
||||
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
|
||||
FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
|
||||
return new PathWithFileStatus(bucketDir, fStatus);
|
||||
}
|
||||
|
||||
private static FileStatus createFileStatusWithLengthForFile(long length,
|
||||
long modificationTime,
|
||||
Path logPath) {
|
||||
return new FileStatus(length, false, 1, 1, modificationTime, logPath);
|
||||
}
|
||||
|
||||
private static FileStatus createFileStatusWithLengthForDir(long length,
|
||||
long modificationTime,
|
||||
Path logPath) {
|
||||
return new FileStatus(length, true, 1, 1, modificationTime, logPath);
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -138,80 +96,34 @@ public class TestAggregatedLogDeletionService {
|
|||
long toKeepTime = now - (1500 * 1000);
|
||||
|
||||
Configuration conf = setupConfiguration(1800, -1);
|
||||
|
||||
Path rootPath = new Path(ROOT);
|
||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
||||
|
||||
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
|
||||
toKeepTime);
|
||||
|
||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
|
||||
|
||||
ApplicationId appId1 = ApplicationId.newInstance(now, 1);
|
||||
ApplicationId appId2 = ApplicationId.newInstance(now, 2);
|
||||
ApplicationId appId3 = ApplicationId.newInstance(now, 3);
|
||||
ApplicationId appId4 = ApplicationId.newInstance(now, 4);
|
||||
|
||||
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
|
||||
toDeleteTime);
|
||||
PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
|
||||
toDeleteTime);
|
||||
|
||||
PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
|
||||
USER_ME, SUFFIX, toDeleteTime);
|
||||
PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
|
||||
USER_ME, SUFFIX, toDeleteTime);
|
||||
PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
|
||||
USER_ME, SUFFIX, toDeleteTime);
|
||||
PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
|
||||
USER_ME, SUFFIX, toDeleteTime);
|
||||
|
||||
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
|
||||
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
|
||||
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
|
||||
app1.fileStatus, app2.fileStatus, app3.fileStatus, app4.fileStatus});
|
||||
|
||||
PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
|
||||
toDeleteTime);
|
||||
PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2, toKeepTime);
|
||||
PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
|
||||
toDeleteTime);
|
||||
PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
|
||||
toDeleteTime);
|
||||
PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
|
||||
toDeleteTime);
|
||||
PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
|
||||
|
||||
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
|
||||
when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
|
||||
app2Log2.fileStatus});
|
||||
when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
|
||||
app3Log2.fileStatus});
|
||||
when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
|
||||
app4Log2.fileStatus});
|
||||
when(mockFs.delete(app3.path, true)).thenThrow(
|
||||
new AccessControlException("Injected Error\nStack Trace :("));
|
||||
|
||||
final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
|
||||
Arrays.asList(appId1, appId2, appId3));
|
||||
final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
|
||||
|
||||
AggregatedLogDeletionService deletionService =
|
||||
new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
|
||||
deletionService.init(conf);
|
||||
deletionService.start();
|
||||
|
||||
int timeout = 2000;
|
||||
verify(mockFs, timeout(timeout)).delete(app1.path, true);
|
||||
verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
|
||||
verify(mockFs, timeout(timeout)).delete(app3.path, true);
|
||||
verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
|
||||
verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
|
||||
verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
|
||||
|
||||
deletionService.stop();
|
||||
long timeout = 2000L;
|
||||
LogAggregationTestcaseBuilder.create(conf)
|
||||
.withRootPath(ROOT)
|
||||
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
|
||||
.withUserDir(USER_ME, toKeepTime)
|
||||
.withSuffixDir(SUFFIX, toDeleteTime)
|
||||
.withBucketDir(toDeleteTime)
|
||||
.withApps(Lists.newArrayList(
|
||||
new AppDescriptor(toDeleteTime, Lists.newArrayList()),
|
||||
new AppDescriptor(toDeleteTime, Lists.newArrayList(
|
||||
Pair.of(DIR_HOST1, toDeleteTime),
|
||||
Pair.of(DIR_HOST2, toKeepTime))),
|
||||
new AppDescriptor(toDeleteTime, Lists.newArrayList(
|
||||
Pair.of(DIR_HOST1, toDeleteTime),
|
||||
Pair.of(DIR_HOST2, toDeleteTime))),
|
||||
new AppDescriptor(toDeleteTime, Lists.newArrayList(
|
||||
Pair.of(DIR_HOST1, toDeleteTime),
|
||||
Pair.of(DIR_HOST2, toKeepTime)))))
|
||||
.withFinishedApps(1, 2, 3)
|
||||
.withRunningApps(4)
|
||||
.injectExceptionForAppDirDeletion(3)
|
||||
.build()
|
||||
.setupAndRunDeletionService()
|
||||
.verifyAppDirsDeleted(timeout, 1, 3)
|
||||
.verifyAppDirsNotDeleted(timeout, 2, 4)
|
||||
.verifyAppFileDeleted(4, 1, timeout)
|
||||
.verifyAppFileNotDeleted(4, 2, timeout)
|
||||
.teardown();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -224,74 +136,47 @@ public class TestAggregatedLogDeletionService {
|
|||
|
||||
Configuration conf = setupConfiguration(1800, 1);
|
||||
|
||||
Path rootPath = new Path(ROOT);
|
||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||
FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
|
||||
|
||||
ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
||||
|
||||
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||
|
||||
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
|
||||
before50Secs);
|
||||
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
|
||||
before50Secs);
|
||||
PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
|
||||
USER_ME, SUFFIX, appId1, before50Secs);
|
||||
|
||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
|
||||
|
||||
//Set time last modified of app1Dir directory and its files to before2000Secs
|
||||
PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
|
||||
USER_ME, SUFFIX, before2000Secs);
|
||||
|
||||
//Set time last modified of app1Dir directory and its files to before50Secs
|
||||
PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
|
||||
USER_ME, SUFFIX, before50Secs);
|
||||
|
||||
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
|
||||
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
|
||||
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
|
||||
app2.fileStatus});
|
||||
|
||||
PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
|
||||
before2000Secs);
|
||||
PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
|
||||
before50Secs);
|
||||
|
||||
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
|
||||
when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
|
||||
|
||||
final List<ApplicationId> finishedApplications =
|
||||
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
|
||||
|
||||
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
|
||||
finishedApplications, conf);
|
||||
LogAggregationTestcase testcase = LogAggregationTestcaseBuilder.create(conf)
|
||||
.withRootPath(ROOT)
|
||||
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
|
||||
.withUserDir(USER_ME, before50Secs)
|
||||
.withSuffixDir(SUFFIX, before50Secs)
|
||||
.withBucketDir(before50Secs)
|
||||
.withApps(Lists.newArrayList(
|
||||
//Set time last modified of app1Dir directory and its files to before2000Secs
|
||||
new AppDescriptor(before2000Secs, Lists.newArrayList(
|
||||
Pair.of(DIR_HOST1, before2000Secs))),
|
||||
//Set time last modified of app1Dir directory and its files to before50Secs
|
||||
new AppDescriptor(before50Secs, Lists.newArrayList(
|
||||
Pair.of(DIR_HOST1, before50Secs))))
|
||||
)
|
||||
.withFinishedApps(1, 2)
|
||||
.withRunningApps()
|
||||
.build();
|
||||
|
||||
deletionSvc.init(conf);
|
||||
deletionSvc.start();
|
||||
|
||||
//app1Dir would be deleted since its done above log retention period
|
||||
verify(mockFs, timeout(10000)).delete(app1.path, true);
|
||||
//app2Dir is not expected to be deleted since it is below the threshold
|
||||
verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
|
||||
testcase
|
||||
.setupAndRunDeletionService()
|
||||
//app1Dir would be deleted since it is done above log retention period
|
||||
.verifyAppDirDeleted(1, 10000L)
|
||||
//app2Dir is not expected to be deleted since it is below the threshold
|
||||
.verifyAppDirNotDeleted(2, 3000L);
|
||||
|
||||
//Now, let's change the confs
|
||||
//Now, let's change the log aggregation retention configs
|
||||
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
|
||||
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||
checkIntervalSeconds);
|
||||
//We have not called refreshLogSettings,hence don't expect to see the changed conf values
|
||||
assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
|
||||
|
||||
//refresh the log settings
|
||||
deletionSvc.refreshLogRetentionSettings();
|
||||
|
||||
//Check interval time should reflect the new value
|
||||
Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
|
||||
//app2Dir should be deleted since it falls above the threshold
|
||||
verify(mockFs, timeout(10000)).delete(app2.path, true);
|
||||
deletionSvc.stop();
|
||||
testcase
|
||||
//We have not called refreshLogSettings, hence don't expect to see
|
||||
// the changed conf values
|
||||
.verifyCheckIntervalMilliSecondsNotEqualTo(checkIntervalMilliSeconds)
|
||||
//refresh the log settings
|
||||
.refreshLogRetentionSettings()
|
||||
//Check interval time should reflect the new value
|
||||
.verifyCheckIntervalMilliSecondsEqualTo(checkIntervalMilliSeconds)
|
||||
//app2Dir should be deleted since it falls above the threshold
|
||||
.verifyAppDirDeleted(2, 10000L)
|
||||
.teardown();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -303,52 +188,30 @@ public class TestAggregatedLogDeletionService {
|
|||
|
||||
// prevent us from picking up the same mockfs instance from another test
|
||||
FileSystem.closeAll();
|
||||
Path rootPath = new Path(ROOT);
|
||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
||||
|
||||
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||
|
||||
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
|
||||
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
|
||||
|
||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
|
||||
|
||||
ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
|
||||
USER_ME, SUFFIX, appId1, now);
|
||||
|
||||
PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
|
||||
USER_ME, SUFFIX, now);
|
||||
PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
|
||||
|
||||
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
|
||||
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
|
||||
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
|
||||
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
|
||||
|
||||
final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
|
||||
|
||||
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
|
||||
finishedApplications);
|
||||
deletionSvc.init(conf);
|
||||
deletionSvc.start();
|
||||
|
||||
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
|
||||
verify(mockFs, never()).delete(app1.path, true);
|
||||
|
||||
// modify the timestamp of the logs and verify if it's picked up quickly
|
||||
app1.changeModificationTime(toDeleteTime);
|
||||
app1Log1.changeModificationTime(toDeleteTime);
|
||||
bucketDir.changeModificationTime(toDeleteTime);
|
||||
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
|
||||
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
|
||||
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
|
||||
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
|
||||
|
||||
verify(mockFs, timeout(10000)).delete(app1.path, true);
|
||||
|
||||
deletionSvc.stop();
|
||||
LogAggregationTestcaseBuilder.create(conf)
|
||||
.withRootPath(ROOT)
|
||||
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
|
||||
.withUserDir(USER_ME, now)
|
||||
.withSuffixDir(SUFFIX, now)
|
||||
.withBucketDir(now)
|
||||
.withApps(Lists.newArrayList(
|
||||
new AppDescriptor(now,
|
||||
Lists.newArrayList(Pair.of(DIR_HOST1, now))),
|
||||
new AppDescriptor(now)))
|
||||
.withFinishedApps(1)
|
||||
.withRunningApps()
|
||||
.build()
|
||||
.setupAndRunDeletionService()
|
||||
.verifyAnyPathListedAtLeast(4, 10000L)
|
||||
.verifyAppDirNotDeleted(1, NO_TIMEOUT)
|
||||
// modify the timestamp of the logs and verify if it is picked up quickly
|
||||
.changeModTimeOfApp(1, toDeleteTime)
|
||||
.changeModTimeOfAppLogDir(1, 1, toDeleteTime)
|
||||
.changeModTimeOfBucketDir(toDeleteTime)
|
||||
.reinitAllPaths()
|
||||
.verifyAppDirDeleted(1, 10000L)
|
||||
.teardown();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -357,44 +220,25 @@ public class TestAggregatedLogDeletionService {
|
|||
|
||||
// prevent us from picking up the same mockfs instance from another test
|
||||
FileSystem.closeAll();
|
||||
Path rootPath = new Path(ROOT);
|
||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
||||
long modTime = 0L;
|
||||
|
||||
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||
|
||||
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
|
||||
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
|
||||
PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
|
||||
|
||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
|
||||
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
|
||||
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
|
||||
|
||||
ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
||||
ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
|
||||
|
||||
PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
|
||||
PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);
|
||||
PathWithFileStatus app3 = createDirLogPathWithFileStatus(bucketDir.path, appId3.toString(), 0);
|
||||
PathWithFileStatus app3Log3 = createDirLogPathWithFileStatus(app3.path, DIR_HOST1, 0);
|
||||
|
||||
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{
|
||||
app1.fileStatus,app2.fileStatus, app3.fileStatus});
|
||||
when(mockFs.listStatus(app1.path)).thenThrow(
|
||||
new RuntimeException("Should be caught and logged"));
|
||||
when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{});
|
||||
when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log3.fileStatus});
|
||||
|
||||
final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
|
||||
Arrays.asList(appId1, appId3));
|
||||
|
||||
ApplicationClientProtocol rmClient = createMockRMClient(finishedApplications, null);
|
||||
AggregatedLogDeletionService.LogDeletionTask deletionTask =
|
||||
new AggregatedLogDeletionService.LogDeletionTask(conf, TEN_DAYS_IN_SECONDS, rmClient);
|
||||
deletionTask.run();
|
||||
verify(mockFs).delete(app3.path, true);
|
||||
LogAggregationTestcaseBuilder.create(conf)
|
||||
.withRootPath(ROOT)
|
||||
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
|
||||
.withUserDir(USER_ME, modTime)
|
||||
.withSuffixDir(SUFFIX, modTime)
|
||||
.withBucketDir(modTime, "0")
|
||||
.withApps(Lists.newArrayList(
|
||||
new AppDescriptor(modTime),
|
||||
new AppDescriptor(modTime),
|
||||
new AppDescriptor(modTime, Lists.newArrayList(Pair.of(DIR_HOST1, modTime)))))
|
||||
.withAdditionalAppDirs(Lists.newArrayList(Pair.of("application_a", modTime)))
|
||||
.withFinishedApps(1, 3)
|
||||
.withRunningApps()
|
||||
.injectExceptionForAppDirDeletion(1)
|
||||
.build()
|
||||
.runDeletionTask(TEN_DAYS_IN_SECONDS)
|
||||
.verifyAppDirDeleted(3, NO_TIMEOUT);
|
||||
}
|
||||
|
||||
static class MockFileSystem extends FilterFileSystem {
|
||||
|
@ -403,98 +247,10 @@ public class TestAggregatedLogDeletionService {
|
|||
}
|
||||
|
||||
public void initialize(URI name, Configuration conf) throws IOException {}
|
||||
}
|
||||
|
||||
private static ApplicationClientProtocol createMockRMClient(
|
||||
List<ApplicationId> finishedApplications,
|
||||
List<ApplicationId> runningApplications) throws Exception {
|
||||
final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);
|
||||
if (finishedApplications != null && !finishedApplications.isEmpty()) {
|
||||
for (ApplicationId appId : finishedApplications) {
|
||||
GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
|
||||
GetApplicationReportResponse response = createApplicationReportWithFinishedApplication();
|
||||
when(mockProtocol.getApplicationReport(request)).thenReturn(response);
|
||||
}
|
||||
}
|
||||
if (runningApplications != null && !runningApplications.isEmpty()) {
|
||||
for (ApplicationId appId : runningApplications) {
|
||||
GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
|
||||
GetApplicationReportResponse response = createApplicationReportWithRunningApplication();
|
||||
when(mockProtocol.getApplicationReport(request)).thenReturn(response);
|
||||
}
|
||||
}
|
||||
return mockProtocol;
|
||||
}
|
||||
|
||||
private static GetApplicationReportResponse createApplicationReportWithRunningApplication() {
|
||||
ApplicationReport report = mock(ApplicationReport.class);
|
||||
when(report.getYarnApplicationState()).thenReturn(
|
||||
YarnApplicationState.RUNNING);
|
||||
GetApplicationReportResponse response =
|
||||
mock(GetApplicationReportResponse.class);
|
||||
when(response.getApplicationReport()).thenReturn(report);
|
||||
return response;
|
||||
}
|
||||
|
||||
private static GetApplicationReportResponse createApplicationReportWithFinishedApplication() {
|
||||
ApplicationReport report = mock(ApplicationReport.class);
|
||||
when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
|
||||
GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
|
||||
when(response.getApplicationReport()).thenReturn(report);
|
||||
return response;
|
||||
}
|
||||
|
||||
private static class PathWithFileStatus {
|
||||
private final Path path;
|
||||
private FileStatus fileStatus;
|
||||
|
||||
PathWithFileStatus(Path path, FileStatus fileStatus) {
|
||||
this.path = path;
|
||||
this.fileStatus = fileStatus;
|
||||
}
|
||||
|
||||
public void changeModificationTime(long modTime) {
|
||||
fileStatus = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
|
||||
fileStatus.getReplication(),
|
||||
fileStatus.getBlockSize(), modTime, fileStatus.getPath());
|
||||
}
|
||||
}
|
||||
|
||||
private static class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService {
|
||||
private final List<ApplicationId> finishedApplications;
|
||||
private final List<ApplicationId> runningApplications;
|
||||
private final Configuration conf;
|
||||
|
||||
AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
|
||||
List<ApplicationId> finishedApplications) {
|
||||
this(runningApplications, finishedApplications, null);
|
||||
}
|
||||
|
||||
AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
|
||||
List<ApplicationId> finishedApplications,
|
||||
Configuration conf) {
|
||||
this.runningApplications = runningApplications;
|
||||
this.finishedApplications = finishedApplications;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ApplicationClientProtocol createRMClient() throws IOException {
|
||||
try {
|
||||
return createMockRMClient(finishedApplications, runningApplications);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopRMClient() {
|
||||
// DO NOTHING
|
||||
public boolean hasPathCapability(Path path, String capability) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,24 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Writer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -56,6 +38,21 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Writer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS;
|
||||
import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.REMOTE_LOG_ROOT;
|
||||
import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Test LogAggregationFileControllerFactory.
|
||||
*/
|
||||
|
@ -63,7 +60,6 @@ public class TestLogAggregationFileControllerFactory extends Configured {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestLogAggregationFileControllerFactory.class);
|
||||
|
||||
private static final String REMOTE_LOG_ROOT = "target/app-logs/";
|
||||
private static final String REMOTE_DEFAULT_DIR = "default/";
|
||||
private static final String APP_OWNER = "test";
|
||||
|
||||
|
@ -87,8 +83,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
|
|||
public void setup() throws IOException {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT +
|
||||
REMOTE_DEFAULT_DIR);
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT + REMOTE_DEFAULT_DIR);
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
|
||||
setConf(conf);
|
||||
}
|
||||
|
@ -143,36 +138,15 @@ public class TestLogAggregationFileControllerFactory extends Configured {
|
|||
@Test(expected = Exception.class)
|
||||
public void testLogAggregationFileControllerFactoryClassNotSet() {
|
||||
Configuration conf = getConf();
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
|
||||
"TestLogAggregationFileController");
|
||||
conf.set(LOG_AGGREGATION_FILE_FORMATS, "TestLogAggregationFileController");
|
||||
new LogAggregationFileControllerFactory(conf);
|
||||
fail("TestLogAggregationFileController's class was not set, " +
|
||||
"but the factory creation did not fail.");
|
||||
}
|
||||
|
||||
private void enableFileControllers(
|
||||
List<Class<? extends LogAggregationFileController>> fileControllers,
|
||||
List<String> fileControllerNames) {
|
||||
Configuration conf = getConf();
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
|
||||
StringUtils.join(fileControllerNames, ","));
|
||||
for (int i = 0; i < fileControllers.size(); i++) {
|
||||
Class<? extends LogAggregationFileController> fileController =
|
||||
fileControllers.get(i);
|
||||
String controllerName = fileControllerNames.get(i);
|
||||
|
||||
conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
|
||||
controllerName), fileController, LogAggregationFileController.class);
|
||||
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
|
||||
controllerName), REMOTE_LOG_ROOT + controllerName + "/");
|
||||
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
|
||||
controllerName), controllerName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogAggregationFileControllerFactory() throws Exception {
|
||||
enableFileControllers(ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
|
||||
enableFileControllers(getConf(), ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
|
||||
LogAggregationFileControllerFactory factory =
|
||||
new LogAggregationFileControllerFactory(getConf());
|
||||
List<LogAggregationFileController> list =
|
||||
|
@ -199,8 +173,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
|
|||
|
||||
@Test
|
||||
public void testClassConfUsed() {
|
||||
enableFileControllers(Collections.singletonList(
|
||||
LogAggregationTFileController.class),
|
||||
enableFileControllers(getConf(), Collections.singletonList(LogAggregationTFileController.class),
|
||||
Collections.singletonList("TFile"));
|
||||
LogAggregationFileControllerFactory factory =
|
||||
new LogAggregationFileControllerFactory(getConf());
|
||||
|
@ -215,7 +188,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
|
|||
@Test
|
||||
public void testNodemanagerConfigurationIsUsed() {
|
||||
Configuration conf = getConf();
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
||||
conf.set(LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
||||
LogAggregationFileControllerFactory factory =
|
||||
new LogAggregationFileControllerFactory(conf);
|
||||
LogAggregationFileController fc = factory.getFileControllerForWrite();
|
||||
|
@ -231,7 +204,7 @@ public class TestLogAggregationFileControllerFactory extends Configured {
|
|||
Configuration conf = getConf();
|
||||
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR);
|
||||
conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX);
|
||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
||||
conf.set(LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
||||
|
||||
LogAggregationFileControllerFactory factory =
|
||||
new LogAggregationFileControllerFactory(getConf());
|
||||
|
@ -268,20 +241,19 @@ public class TestLogAggregationFileControllerFactory extends Configured {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void initializeWriter(LogAggregationFileControllerContext context)
|
||||
throws IOException {
|
||||
public void initializeWriter(LogAggregationFileControllerContext context) {
|
||||
// Do Nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
|
||||
OutputStream os) throws IOException {
|
||||
OutputStream os) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException {
|
||||
ContainerLogsRequest logRequest) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.testutils;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.logaggregation.testutils.MockRMClientUtils.createMockRMClient;
|
||||
|
||||
public class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService {
|
||||
private final List<ApplicationId> finishedApplications;
|
||||
private final List<ApplicationId> runningApplications;
|
||||
private final Configuration conf;
|
||||
|
||||
public AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
|
||||
List<ApplicationId> finishedApplications) {
|
||||
this(runningApplications, finishedApplications, null);
|
||||
}
|
||||
|
||||
public AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
|
||||
List<ApplicationId> finishedApplications,
|
||||
Configuration conf) {
|
||||
this.runningApplications = runningApplications;
|
||||
this.finishedApplications = finishedApplications;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ApplicationClientProtocol createRMClient() throws IOException {
|
||||
try {
|
||||
return createMockRMClient(finishedApplications, runningApplications);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopRMClient() {
|
||||
// DO NOTHING
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.testutils;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
|
||||
public class FileStatusUtils {
|
||||
public static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
|
||||
ApplicationId appId,
|
||||
String user, String suffix,
|
||||
long modificationTime) {
|
||||
Path path = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, appId, user, suffix);
|
||||
FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
|
||||
return new PathWithFileStatus(path, fileStatus);
|
||||
}
|
||||
|
||||
public static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
|
||||
return new FileStatus(0, true, 0, 0, modificationTime, path);
|
||||
}
|
||||
|
||||
public static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
|
||||
long modificationTime) {
|
||||
Path logPath = new Path(baseDir, childDir);
|
||||
FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
|
||||
return new PathWithFileStatus(logPath, fStatus);
|
||||
}
|
||||
|
||||
public static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
|
||||
long modificationTime) {
|
||||
Path logPath = new Path(baseDir, childDir);
|
||||
FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
|
||||
return new PathWithFileStatus(logPath, fStatus);
|
||||
}
|
||||
|
||||
public static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
|
||||
String user,
|
||||
String suffix,
|
||||
ApplicationId appId,
|
||||
long modificationTime) {
|
||||
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
|
||||
FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
|
||||
return new PathWithFileStatus(bucketDir, fStatus);
|
||||
}
|
||||
|
||||
public static FileStatus createFileStatusWithLengthForFile(long length,
|
||||
long modificationTime,
|
||||
Path logPath) {
|
||||
return new FileStatus(length, false, 1, 1, modificationTime, logPath);
|
||||
}
|
||||
|
||||
public static FileStatus createFileStatusWithLengthForDir(long length,
|
||||
long modificationTime,
|
||||
Path logPath) {
|
||||
return new FileStatus(length, true, 1, 1, modificationTime, logPath);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,421 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.testutils;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.Sets;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
|
||||
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.yarn.logaggregation.testutils.FileStatusUtils.*;
|
||||
import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
|
||||
import static org.apache.hadoop.yarn.logaggregation.testutils.MockRMClientUtils.createMockRMClient;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class LogAggregationTestcase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LogAggregationTestcase.class);
|
||||
|
||||
private final Configuration conf;
|
||||
private final long now;
|
||||
private PathWithFileStatus bucketDir;
|
||||
private final long bucketDirModTime;
|
||||
private PathWithFileStatus userDir;
|
||||
private final String userDirName;
|
||||
private final long userDirModTime;
|
||||
private PathWithFileStatus suffixDir;
|
||||
private final String suffix;
|
||||
private final String suffixDirName;
|
||||
private final long suffixDirModTime;
|
||||
private final String bucketId;
|
||||
private final Path remoteRootLogPath;
|
||||
private final Map<Integer, Exception> injectedAppDirDeletionExceptions;
|
||||
private final List<String> fileControllers;
|
||||
private final List<Pair<String, Long>> additionalAppDirs;
|
||||
|
||||
private final List<ApplicationId> applicationIds = new ArrayList<>();
|
||||
private final int[] runningAppIds;
|
||||
private final int[] finishedAppIds;
|
||||
private final List<List<PathWithFileStatus>> appFiles = new ArrayList<>();
|
||||
private final FileSystem mockFs;
|
||||
private List<PathWithFileStatus> appDirs;
|
||||
private final List<AppDescriptor> appDescriptors;
|
||||
private AggregatedLogDeletionServiceForTest deletionService;
|
||||
|
||||
public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws IOException {
|
||||
conf = builder.conf;
|
||||
now = builder.now;
|
||||
bucketDir = builder.bucketDir;
|
||||
bucketDirModTime = builder.bucketDirModTime;
|
||||
userDir = builder.userDir;
|
||||
userDirName = builder.userDirName;
|
||||
userDirModTime = builder.userDirModTime;
|
||||
suffix = builder.suffix;
|
||||
suffixDir = builder.suffixDir;
|
||||
suffixDirName = builder.suffixDirName;
|
||||
suffixDirModTime = builder.suffixDirModTime;
|
||||
bucketId = builder.bucketId;
|
||||
appDescriptors = builder.apps;
|
||||
runningAppIds = builder.runningAppIds;
|
||||
finishedAppIds = builder.finishedAppIds;
|
||||
remoteRootLogPath = builder.remoteRootLogPath;
|
||||
injectedAppDirDeletionExceptions = builder.injectedAppDirDeletionExceptions;
|
||||
fileControllers = builder.fileControllers;
|
||||
additionalAppDirs = builder.additionalAppDirs;
|
||||
|
||||
mockFs = ((FilterFileSystem) builder.rootFs).getRawFileSystem();
|
||||
validateAppControllers();
|
||||
setupMocks();
|
||||
}
|
||||
|
||||
private void validateAppControllers() {
|
||||
Set<String> controllers = appDescriptors.stream()
|
||||
.map(a -> a.fileController)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> availableControllers = fileControllers != null ?
|
||||
new HashSet<>(this.fileControllers) : Sets.newHashSet();
|
||||
Set<String> difference = Sets.difference(controllers, availableControllers);
|
||||
if (!difference.isEmpty()) {
|
||||
throw new IllegalStateException(String.format("Invalid controller defined!" +
|
||||
" Available: %s, Actual: %s", availableControllers, controllers));
|
||||
}
|
||||
}
|
||||
|
||||
private void setupMocks() throws IOException {
|
||||
createApplicationsByDescriptors();
|
||||
|
||||
List<Path> rootPaths = determineRootPaths();
|
||||
for (Path rootPath : rootPaths) {
|
||||
String controllerName = rootPath.getName();
|
||||
ApplicationId arbitraryAppIdForBucketDir = this.applicationIds.get(0);
|
||||
userDir = createDirLogPathWithFileStatus(rootPath, userDirName, userDirModTime);
|
||||
suffixDir = createDirLogPathWithFileStatus(userDir.path, suffixDirName, suffixDirModTime);
|
||||
if (bucketId != null) {
|
||||
bucketDir = createDirLogPathWithFileStatus(suffixDir.path, bucketId, bucketDirModTime);
|
||||
} else {
|
||||
bucketDir = createDirBucketDirLogPathWithFileStatus(rootPath, userDirName, suffix,
|
||||
arbitraryAppIdForBucketDir, bucketDirModTime);
|
||||
}
|
||||
setupListStatusForPath(rootPath, userDir);
|
||||
initFileSystemListings(controllerName);
|
||||
}
|
||||
}
|
||||
|
||||
private List<Path> determineRootPaths() {
|
||||
List<Path> rootPaths = new ArrayList<>();
|
||||
if (fileControllers != null && !fileControllers.isEmpty()) {
|
||||
for (String fileController : fileControllers) {
|
||||
//Generic path: <remote-app-log-dir>/<user>/bucket-<suffix>/<bucket id>/
|
||||
// <application id>/<NodeManager id>
|
||||
|
||||
//remoteRootLogPath: <remote-app-log-dir>/
|
||||
//example: mockfs://foo/tmp/logs/
|
||||
|
||||
//userDir: <remote-app-log-dir>/<user>/
|
||||
//example: mockfs://foo/tmp/logs/me/
|
||||
|
||||
//suffixDir: <remote-app-log-dir>/<user>/bucket-<suffix>/
|
||||
//example: mockfs://foo/tmp/logs/me/bucket-logs/
|
||||
|
||||
//bucketDir: <remote-app-log-dir>/<user>/bucket-<suffix>/<bucket id>/
|
||||
//example: mockfs://foo/tmp/logs/me/bucket-logs/0001/
|
||||
|
||||
//remoteRootLogPath with controller: <remote-app-log-dir>/<controllerName>
|
||||
//example: mockfs://foo/tmp/logs/IFile
|
||||
rootPaths.add(new Path(remoteRootLogPath, fileController));
|
||||
}
|
||||
} else {
|
||||
rootPaths.add(remoteRootLogPath);
|
||||
}
|
||||
return rootPaths;
|
||||
}
|
||||
|
||||
private void initFileSystemListings(String controllerName) throws IOException {
|
||||
setupListStatusForPath(userDir, suffixDir);
|
||||
setupListStatusForPath(suffixDir, bucketDir);
|
||||
setupListStatusForPath(bucketDir, appDirs.stream()
|
||||
.filter(app -> app.path.toString().contains(controllerName))
|
||||
.map(app -> app.fileStatus)
|
||||
.toArray(FileStatus[]::new));
|
||||
|
||||
for (Pair<String, Long> appDirPair : additionalAppDirs) {
|
||||
PathWithFileStatus appDir = createDirLogPathWithFileStatus(bucketDir.path,
|
||||
appDirPair.getLeft(), appDirPair.getRight());
|
||||
setupListStatusForPath(appDir, new FileStatus[] {});
|
||||
}
|
||||
}
|
||||
|
||||
private void createApplicationsByDescriptors() throws IOException {
|
||||
int len = appDescriptors.size();
|
||||
appDirs = new ArrayList<>(len);
|
||||
|
||||
for (int i = 0; i < len; i++) {
|
||||
AppDescriptor appDesc = appDescriptors.get(i);
|
||||
ApplicationId applicationId = appDesc.createApplicationId(now, i + 1);
|
||||
applicationIds.add(applicationId);
|
||||
Path basePath = this.remoteRootLogPath;
|
||||
if (appDesc.fileController != null) {
|
||||
basePath = new Path(basePath, appDesc.fileController);
|
||||
}
|
||||
|
||||
PathWithFileStatus appDir = createPathWithFileStatusForAppId(
|
||||
basePath, applicationId, userDirName, suffix, appDesc.modTimeOfAppDir);
|
||||
LOG.debug("Created application with ID '{}' to path '{}'", applicationId, appDir.path);
|
||||
appDirs.add(appDir);
|
||||
addAppChildrenFiles(appDesc, appDir);
|
||||
}
|
||||
|
||||
setupFsMocksForAppsAndChildrenFiles();
|
||||
|
||||
for (Map.Entry<Integer, Exception> e : injectedAppDirDeletionExceptions.entrySet()) {
|
||||
when(mockFs.delete(this.appDirs.get(e.getKey()).path, true)).thenThrow(e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void setupFsMocksForAppsAndChildrenFiles() throws IOException {
|
||||
for (int i = 0; i < appDirs.size(); i++) {
|
||||
List<PathWithFileStatus> appChildren = appFiles.get(i);
|
||||
Path appPath = appDirs.get(i).path;
|
||||
setupListStatusForPath(appPath,
|
||||
appChildren.stream()
|
||||
.map(child -> child.fileStatus)
|
||||
.toArray(FileStatus[]::new));
|
||||
}
|
||||
}
|
||||
|
||||
private void setupListStatusForPath(Path dir, PathWithFileStatus pathWithFileStatus)
|
||||
throws IOException {
|
||||
setupListStatusForPath(dir, new FileStatus[]{pathWithFileStatus.fileStatus});
|
||||
}
|
||||
|
||||
private void setupListStatusForPath(PathWithFileStatus dir, PathWithFileStatus pathWithFileStatus)
|
||||
throws IOException {
|
||||
setupListStatusForPath(dir, new FileStatus[]{pathWithFileStatus.fileStatus});
|
||||
}
|
||||
|
||||
private void setupListStatusForPath(Path dir, FileStatus[] fileStatuses) throws IOException {
|
||||
LOG.debug("Setting up listStatus. Parent: {}, files: {}", dir, fileStatuses);
|
||||
when(mockFs.listStatus(dir)).thenReturn(fileStatuses);
|
||||
}
|
||||
|
||||
private void setupListStatusForPath(PathWithFileStatus dir, FileStatus[] fileStatuses)
|
||||
throws IOException {
|
||||
LOG.debug("Setting up listStatus. Parent: {}, files: {}", dir.path, fileStatuses);
|
||||
when(mockFs.listStatus(dir.path)).thenReturn(fileStatuses);
|
||||
}
|
||||
|
||||
public LogAggregationTestcase setupAndRunDeletionService() {
|
||||
List<ApplicationId> finishedApps = createFinishedAppsList();
|
||||
List<ApplicationId> runningApps = createRunningAppsList();
|
||||
deletionService = new AggregatedLogDeletionServiceForTest(runningApps, finishedApps, conf);
|
||||
deletionService.init(conf);
|
||||
deletionService.start();
|
||||
return this;
|
||||
}
|
||||
|
||||
private List<ApplicationId> createRunningAppsList() {
|
||||
List<ApplicationId> runningApps = new ArrayList<>();
|
||||
for (int i : runningAppIds) {
|
||||
ApplicationId appId = this.applicationIds.get(i - 1);
|
||||
runningApps.add(appId);
|
||||
}
|
||||
return runningApps;
|
||||
}
|
||||
|
||||
private List<ApplicationId> createFinishedAppsList() {
|
||||
List<ApplicationId> finishedApps = new ArrayList<>();
|
||||
for (int i : finishedAppIds) {
|
||||
ApplicationId appId = this.applicationIds.get(i - 1);
|
||||
finishedApps.add(appId);
|
||||
}
|
||||
return finishedApps;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase runDeletionTask(long retentionSeconds) throws Exception {
|
||||
List<ApplicationId> finishedApps = createFinishedAppsList();
|
||||
List<ApplicationId> runningApps = createRunningAppsList();
|
||||
ApplicationClientProtocol rmClient = createMockRMClient(finishedApps, runningApps);
|
||||
AggregatedLogDeletionService.LogDeletionTask deletionTask =
|
||||
new AggregatedLogDeletionService.LogDeletionTask(conf, retentionSeconds, rmClient);
|
||||
deletionTask.run();
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addAppChildrenFiles(AppDescriptor appDesc, PathWithFileStatus appDir) {
|
||||
List<PathWithFileStatus> appChildren = new ArrayList<>();
|
||||
for (Pair<String, Long> fileWithModDate : appDesc.filesWithModDate) {
|
||||
PathWithFileStatus appChildFile = createFileLogPathWithFileStatus(appDir.path,
|
||||
fileWithModDate.getLeft(),
|
||||
fileWithModDate.getRight());
|
||||
appChildren.add(appChildFile);
|
||||
}
|
||||
this.appFiles.add(appChildren);
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppDirsDeleted(long timeout, int... ids) throws IOException {
|
||||
for (int id : ids) {
|
||||
verifyAppDirDeleted(id, timeout);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppDirsNotDeleted(long timeout, int... ids)
|
||||
throws IOException {
|
||||
for (int id : ids) {
|
||||
verifyAppDirNotDeleted(id, timeout);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppDirDeleted(int id, long timeout) throws IOException {
|
||||
verifyAppDirDeletion(id, 1, timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppDirNotDeleted(int id, long timeout) throws IOException {
|
||||
verifyAppDirDeletion(id, 0, timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppFilesDeleted(long timeout,
|
||||
List<Pair<Integer, Integer>> pairs)
|
||||
throws IOException {
|
||||
for (Pair<Integer, Integer> pair : pairs) {
|
||||
verifyAppFileDeleted(pair.getLeft(), pair.getRight(), timeout);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppFilesNotDeleted(long timeout,
|
||||
List<Pair<Integer, Integer>> pairs)
|
||||
throws IOException {
|
||||
for (Pair<Integer, Integer> pair : pairs) {
|
||||
verifyAppFileNotDeleted(pair.getLeft(), pair.getRight(), timeout);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppFileDeleted(int id, int fileNo, long timeout)
|
||||
throws IOException {
|
||||
verifyAppFileDeletion(id, fileNo, 1, timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAppFileNotDeleted(int id, int fileNo, long timeout)
|
||||
throws IOException {
|
||||
verifyAppFileDeletion(id, fileNo, 0, timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
private void verifyAppDirDeletion(int id, int times, long timeout) throws IOException {
|
||||
if (timeout == NO_TIMEOUT) {
|
||||
verify(mockFs, times(times)).delete(this.appDirs.get(id - 1).path, true);
|
||||
} else {
|
||||
verify(mockFs, timeout(timeout).times(times)).delete(this.appDirs.get(id - 1).path, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyAppFileDeletion(int appId, int fileNo, int times, long timeout)
|
||||
throws IOException {
|
||||
List<PathWithFileStatus> childrenFiles = this.appFiles.get(appId - 1);
|
||||
PathWithFileStatus file = childrenFiles.get(fileNo - 1);
|
||||
verify(mockFs, timeout(timeout).times(times)).delete(file.path, true);
|
||||
}
|
||||
|
||||
public void teardown() {
|
||||
deletionService.stop();
|
||||
}
|
||||
|
||||
public LogAggregationTestcase refreshLogRetentionSettings() throws IOException {
|
||||
deletionService.refreshLogRetentionSettings();
|
||||
return this;
|
||||
}
|
||||
|
||||
public AggregatedLogDeletionService getDeletionService() {
|
||||
return deletionService;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyCheckIntervalMilliSecondsEqualTo(
|
||||
int checkIntervalMilliSeconds) {
|
||||
assertEquals(checkIntervalMilliSeconds, deletionService.getCheckIntervalMsecs());
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyCheckIntervalMilliSecondsNotEqualTo(
|
||||
int checkIntervalMilliSeconds) {
|
||||
assertTrue(checkIntervalMilliSeconds != deletionService.getCheckIntervalMsecs());
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase verifyAnyPathListedAtLeast(int atLeast, long timeout)
|
||||
throws IOException {
|
||||
verify(mockFs, timeout(timeout).atLeast(atLeast)).listStatus(any(Path.class));
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase changeModTimeOfApp(int appId, long modTime) {
|
||||
PathWithFileStatus appDir = appDirs.get(appId - 1);
|
||||
appDir.changeModificationTime(modTime);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase changeModTimeOfAppLogDir(int appId, int fileNo, long modTime) {
|
||||
List<PathWithFileStatus> childrenFiles = this.appFiles.get(appId - 1);
|
||||
PathWithFileStatus file = childrenFiles.get(fileNo - 1);
|
||||
file.changeModificationTime(modTime);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase changeModTimeOfBucketDir(long modTime) {
|
||||
bucketDir.changeModificationTime(modTime);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase reinitAllPaths() throws IOException {
|
||||
List<Path> rootPaths = determineRootPaths();
|
||||
for (Path rootPath : rootPaths) {
|
||||
String controllerName = rootPath.getName();
|
||||
initFileSystemListings(controllerName);
|
||||
}
|
||||
setupFsMocksForAppsAndChildrenFiles();
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.testutils;
|
||||
|
||||
import org.apache.commons.compress.utils.Lists;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.yarn.logaggregation.TestAggregatedLogDeletionService.ALL_FILE_CONTROLLER_NAMES;
|
||||
|
||||
public class LogAggregationTestcaseBuilder {
|
||||
public static final long NO_TIMEOUT = -1;
|
||||
final long now;
|
||||
final Configuration conf;
|
||||
Path remoteRootLogPath;
|
||||
String suffix;
|
||||
String userDirName;
|
||||
long userDirModTime;
|
||||
final Map<Integer, Exception> injectedAppDirDeletionExceptions = new HashMap<>();
|
||||
List<String> fileControllers;
|
||||
long suffixDirModTime;
|
||||
long bucketDirModTime;
|
||||
String suffixDirName;
|
||||
List<AppDescriptor> apps = Lists.newArrayList();
|
||||
int[] finishedAppIds;
|
||||
int[] runningAppIds;
|
||||
PathWithFileStatus userDir;
|
||||
PathWithFileStatus suffixDir;
|
||||
PathWithFileStatus bucketDir;
|
||||
String bucketId;
|
||||
List<Pair<String, Long>> additionalAppDirs = new ArrayList<>();
|
||||
FileSystem rootFs;
|
||||
|
||||
public LogAggregationTestcaseBuilder(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.now = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public static LogAggregationTestcaseBuilder create(Configuration conf) {
|
||||
return new LogAggregationTestcaseBuilder(conf);
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withRootPath(String root) throws IOException {
|
||||
Path rootPath = new Path(root);
|
||||
rootFs = rootPath.getFileSystem(conf);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withRemoteRootLogPath(String remoteRootLogDir) {
|
||||
remoteRootLogPath = new Path(remoteRootLogDir);
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withUserDir(String userDirName, long modTime) {
|
||||
this.userDirName = userDirName;
|
||||
this.userDirModTime = modTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withSuffixDir(String suffix, long modTime) {
|
||||
this.suffix = suffix;
|
||||
this.suffixDirName = LogAggregationUtils.getBucketSuffix() + suffix;
|
||||
this.suffixDirModTime = modTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Bucket dir paths will be generated later.
|
||||
* @param modTime The modification time
|
||||
* @return The builder
|
||||
*/
|
||||
public LogAggregationTestcaseBuilder withBucketDir(long modTime) {
|
||||
this.bucketDirModTime = modTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withBucketDir(long modTime, String bucketId) {
|
||||
this.bucketDirModTime = modTime;
|
||||
this.bucketId = bucketId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public final LogAggregationTestcaseBuilder withApps(List<AppDescriptor> apps) {
|
||||
this.apps = apps;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withFinishedApps(int... apps) {
|
||||
this.finishedAppIds = apps;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withRunningApps(int... apps) {
|
||||
this.runningAppIds = apps;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withBothFileControllers() {
|
||||
this.fileControllers = ALL_FILE_CONTROLLER_NAMES;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder withAdditionalAppDirs(List<Pair<String, Long>> appDirs) {
|
||||
this.additionalAppDirs = appDirs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcaseBuilder injectExceptionForAppDirDeletion(int... indices) {
|
||||
for (int i : indices) {
|
||||
AccessControlException e = new AccessControlException("Injected Error\nStack Trace :(");
|
||||
this.injectedAppDirDeletionExceptions.put(i, e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public LogAggregationTestcase build() throws IOException {
|
||||
return new LogAggregationTestcase(this);
|
||||
}
|
||||
|
||||
public static final class AppDescriptor {
|
||||
final long modTimeOfAppDir;
|
||||
List<Pair<String, Long>> filesWithModDate = new ArrayList<>();
|
||||
String fileController;
|
||||
|
||||
public AppDescriptor(long modTimeOfAppDir) {
|
||||
this.modTimeOfAppDir = modTimeOfAppDir;
|
||||
}
|
||||
|
||||
public AppDescriptor(long modTimeOfAppDir, List<Pair<String, Long>> filesWithModDate) {
|
||||
this.modTimeOfAppDir = modTimeOfAppDir;
|
||||
this.filesWithModDate = filesWithModDate;
|
||||
}
|
||||
|
||||
public AppDescriptor(String fileController, long modTimeOfAppDir,
|
||||
List<Pair<String, Long>> filesWithModDate) {
|
||||
this(modTimeOfAppDir, filesWithModDate);
|
||||
this.fileController = fileController;
|
||||
}
|
||||
|
||||
|
||||
public ApplicationId createApplicationId(long now, int id) {
|
||||
return ApplicationId.newInstance(now, id);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.testutils;
|
||||
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MockRMClientUtils {
|
||||
public static ApplicationClientProtocol createMockRMClient(
|
||||
List<ApplicationId> finishedApplications,
|
||||
List<ApplicationId> runningApplications) throws Exception {
|
||||
final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);
|
||||
if (finishedApplications != null && !finishedApplications.isEmpty()) {
|
||||
for (ApplicationId appId : finishedApplications) {
|
||||
GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
|
||||
GetApplicationReportResponse response = createApplicationReportWithFinishedApplication();
|
||||
when(mockProtocol.getApplicationReport(request)).thenReturn(response);
|
||||
}
|
||||
}
|
||||
if (runningApplications != null && !runningApplications.isEmpty()) {
|
||||
for (ApplicationId appId : runningApplications) {
|
||||
GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
|
||||
GetApplicationReportResponse response = createApplicationReportWithRunningApplication();
|
||||
when(mockProtocol.getApplicationReport(request)).thenReturn(response);
|
||||
}
|
||||
}
|
||||
return mockProtocol;
|
||||
}
|
||||
|
||||
public static GetApplicationReportResponse createApplicationReportWithRunningApplication() {
|
||||
ApplicationReport report = mock(ApplicationReport.class);
|
||||
when(report.getYarnApplicationState()).thenReturn(
|
||||
YarnApplicationState.RUNNING);
|
||||
GetApplicationReportResponse response =
|
||||
mock(GetApplicationReportResponse.class);
|
||||
when(response.getApplicationReport()).thenReturn(report);
|
||||
return response;
|
||||
}
|
||||
|
||||
public static GetApplicationReportResponse createApplicationReportWithFinishedApplication() {
|
||||
ApplicationReport report = mock(ApplicationReport.class);
|
||||
when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
|
||||
GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
|
||||
when(response.getApplicationReport()).thenReturn(report);
|
||||
return response;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.testutils;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
public class PathWithFileStatus {
|
||||
public final Path path;
|
||||
public FileStatus fileStatus;
|
||||
|
||||
public PathWithFileStatus(Path path, FileStatus fileStatus) {
|
||||
this.path = path;
|
||||
this.fileStatus = fileStatus;
|
||||
}
|
||||
|
||||
public void changeModificationTime(long modTime) {
|
||||
fileStatus = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
|
||||
fileStatus.getReplication(),
|
||||
fileStatus.getBlockSize(), modTime, fileStatus.getPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PathWithFileStatus{" +
|
||||
"path=" + path +
|
||||
'}';
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue