YARN-11188. Only files belong to the first file controller are removed even if multiple log aggregation file controllers are configured. Contributed by Szilard Nemeth.

This commit is contained in:
9uapaw 2022-06-22 14:40:00 +02:00
parent c9ddbd210c
commit e6ecc4f3e4
5 changed files with 141 additions and 33 deletions

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.logaggregation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@ -57,7 +59,7 @@ public class AggregatedLogDeletionService extends AbstractService {
private Timer timer = null;
private long checkIntervalMsecs;
private LogDeletionTask task;
private List<LogDeletionTask> tasks;
public static class LogDeletionTask extends TimerTask {
private Configuration conf;
@ -66,14 +68,12 @@ public class AggregatedLogDeletionService extends AbstractService {
private Path remoteRootLogDir = null;
private ApplicationClientProtocol rmClient = null;
public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
public LogDeletionTask(Configuration conf, long retentionSecs,
ApplicationClientProtocol rmClient,
LogAggregationFileController fileController) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getBucketSuffix();
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(conf);
LogAggregationFileController fileController =
factory.getFileControllerForWrite();
this.remoteRootLogDir = fileController.getRemoteRootLogDir();
this.rmClient = rmClient;
}
@ -220,7 +220,7 @@ public class AggregatedLogDeletionService extends AbstractService {
@Override
protected void serviceStart() throws Exception {
scheduleLogDeletionTask();
scheduleLogDeletionTasks();
super.serviceStart();
}
@ -249,13 +249,13 @@ public class AggregatedLogDeletionService extends AbstractService {
setConfig(conf);
stopRMClient();
stopTimer();
scheduleLogDeletionTask();
scheduleLogDeletionTasks();
} else {
LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started");
}
}
private void scheduleLogDeletionTask() throws IOException {
private void scheduleLogDeletionTasks() throws IOException {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@ -271,9 +271,28 @@ public class AggregatedLogDeletionService extends AbstractService {
return;
}
setLogAggCheckIntervalMsecs(retentionSecs);
task = new LogDeletionTask(conf, retentionSecs, createRMClient());
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
tasks = createLogDeletionTasks(conf, retentionSecs, createRMClient());
for (LogDeletionTask task : tasks) {
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
}
}
@VisibleForTesting
public List<LogDeletionTask> createLogDeletionTasks(Configuration conf, long retentionSecs,
ApplicationClientProtocol rmClient)
throws IOException {
List<LogDeletionTask> tasks = new ArrayList<>();
LogAggregationFileControllerFactory factory = new LogAggregationFileControllerFactory(conf);
List<LogAggregationFileController> fileControllers =
factory.getConfiguredLogAggregationFileControllerList();
for (LogAggregationFileController fileController : fileControllers) {
LogDeletionTask task = new LogDeletionTask(conf, retentionSecs, rmClient,
fileController);
tasks.add(task);
}
return tasks;
}
private void stopTimer() {
@ -295,14 +314,18 @@ public class AggregatedLogDeletionService extends AbstractService {
// as @Idempotent, it will automatically take care of RM restart/failover.
@VisibleForTesting
protected ApplicationClientProtocol createRMClient() throws IOException {
return ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
return ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class);
}
@VisibleForTesting
protected void stopRMClient() {
if (task != null && task.getRMClient() != null) {
RPC.stopProxy(task.getRMClient());
for (LogDeletionTask task : tasks) {
if (task != null && task.getRMClient() != null) {
RPC.stopProxy(task.getRMClient());
//The RMClient instance is the same for all deletion tasks.
//It is enough to close the RM client once
break;
}
}
}
}

View File

@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.List;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers;
import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
import static org.mockito.Mockito.mock;
@ -118,12 +119,12 @@ public class TestAggregatedLogDeletionService {
.withRunningApps(4)
.injectExceptionForAppDirDeletion(3)
.build()
.setupAndRunDeletionService()
.startDeletionService()
.verifyAppDirsDeleted(timeout, 1, 3)
.verifyAppDirsNotDeleted(timeout, 2, 4)
.verifyAppFileDeleted(4, 1, timeout)
.verifyAppFileNotDeleted(4, 2, timeout)
.teardown();
.teardown(1);
}
@Test
@ -155,7 +156,7 @@ public class TestAggregatedLogDeletionService {
.build();
testcase
.setupAndRunDeletionService()
.startDeletionService()
//app1Dir would be deleted since it is done above log retention period
.verifyAppDirDeleted(1, 10000L)
//app2Dir is not expected to be deleted since it is below the threshold
@ -176,7 +177,8 @@ public class TestAggregatedLogDeletionService {
.verifyCheckIntervalMilliSecondsEqualTo(checkIntervalMilliSeconds)
//app2Dir should be deleted since it falls above the threshold
.verifyAppDirDeleted(2, 10000L)
.teardown();
//Close expected 2 times: once for refresh and once for stopping
.teardown(2);
}
@Test
@ -202,7 +204,7 @@ public class TestAggregatedLogDeletionService {
.withFinishedApps(1)
.withRunningApps()
.build()
.setupAndRunDeletionService()
.startDeletionService()
.verifyAnyPathListedAtLeast(4, 10000L)
.verifyAppDirNotDeleted(1, NO_TIMEOUT)
// modify the timestamp of the logs and verify if it is picked up quickly
@ -211,7 +213,7 @@ public class TestAggregatedLogDeletionService {
.changeModTimeOfBucketDir(toDeleteTime)
.reinitAllPaths()
.verifyAppDirDeleted(1, 10000L)
.teardown();
.teardown(1);
}
@Test
@ -241,6 +243,59 @@ public class TestAggregatedLogDeletionService {
.verifyAppDirDeleted(3, NO_TIMEOUT);
}
@Test
public void testDeletionTwoControllers() throws IOException {
long now = System.currentTimeMillis();
long toDeleteTime = now - (2000 * 1000);
long toKeepTime = now - (1500 * 1000);
Configuration conf = setupConfiguration(1800, -1);
enableFileControllers(conf, REMOTE_ROOT_LOG_DIR, ALL_FILE_CONTROLLERS,
ALL_FILE_CONTROLLER_NAMES);
long timeout = 2000L;
LogAggregationTestcaseBuilder.create(conf)
.withRootPath(ROOT)
.withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
.withBothFileControllers()
.withUserDir(USER_ME, toKeepTime)
.withSuffixDir(SUFFIX, toDeleteTime)
.withBucketDir(toDeleteTime)
.withApps(//Apps for TFile
Lists.newArrayList(
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList()),
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime))),
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toDeleteTime))),
new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime))),
//Apps for IFile
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList()),
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime))),
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toDeleteTime))),
new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
Pair.of(DIR_HOST1, toDeleteTime),
Pair.of(DIR_HOST2, toKeepTime)))))
.withFinishedApps(1, 2, 3, 5, 6, 7)
.withRunningApps(4, 8)
.injectExceptionForAppDirDeletion(3, 6)
.build()
.startDeletionService()
.verifyAppDirsDeleted(timeout, 1, 3, 5, 7)
.verifyAppDirsNotDeleted(timeout, 2, 4, 6, 8)
.verifyAppFilesDeleted(timeout, Lists.newArrayList(Pair.of(4, 1), Pair.of(8, 1)))
.verifyAppFilesNotDeleted(timeout, Lists.newArrayList(Pair.of(4, 2), Pair.of(8, 2)))
.teardown(1);
}
static class MockFileSystem extends FilterFileSystem {
MockFileSystem() {
super(mock(FileSystem.class));

View File

@ -32,6 +32,7 @@ public class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionSe
private final List<ApplicationId> finishedApplications;
private final List<ApplicationId> runningApplications;
private final Configuration conf;
private ApplicationClientProtocol mockRMClient;
public AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
List<ApplicationId> finishedApplications) {
@ -48,11 +49,16 @@ public class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionSe
@Override
protected ApplicationClientProtocol createRMClient() throws IOException {
if (mockRMClient != null) {
return mockRMClient;
}
try {
return createMockRMClient(finishedApplications, runningApplications);
mockRMClient =
createMockRMClient(finishedApplications, runningApplications);
} catch (Exception e) {
throw new IOException(e);
}
return mockRMClient;
}
@Override
@ -60,8 +66,7 @@ public class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionSe
return conf;
}
@Override
protected void stopRMClient() {
// DO NOTHING
public ApplicationClientProtocol getMockRMClient() {
return mockRMClient;
}
}

View File

@ -28,10 +28,12 @@ import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService.LogDeletionTask;
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@ -77,6 +79,7 @@ public class LogAggregationTestcase {
private List<PathWithFileStatus> appDirs;
private final List<AppDescriptor> appDescriptors;
private AggregatedLogDeletionServiceForTest deletionService;
private ApplicationClientProtocol rmClient;
public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws IOException {
conf = builder.conf;
@ -102,6 +105,8 @@ public class LogAggregationTestcase {
mockFs = ((FilterFileSystem) builder.rootFs).getRawFileSystem();
validateAppControllers();
setupMocks();
setupDeletionService();
}
private void validateAppControllers() {
@ -241,10 +246,13 @@ public class LogAggregationTestcase {
when(mockFs.listStatus(dir.path)).thenReturn(fileStatuses);
}
public LogAggregationTestcase setupAndRunDeletionService() {
private void setupDeletionService() {
List<ApplicationId> finishedApps = createFinishedAppsList();
List<ApplicationId> runningApps = createRunningAppsList();
deletionService = new AggregatedLogDeletionServiceForTest(runningApps, finishedApps, conf);
}
public LogAggregationTestcase startDeletionService() {
deletionService.init(conf);
deletionService.start();
return this;
@ -271,10 +279,13 @@ public class LogAggregationTestcase {
public LogAggregationTestcase runDeletionTask(long retentionSeconds) throws Exception {
List<ApplicationId> finishedApps = createFinishedAppsList();
List<ApplicationId> runningApps = createRunningAppsList();
ApplicationClientProtocol rmClient = createMockRMClient(finishedApps, runningApps);
AggregatedLogDeletionService.LogDeletionTask deletionTask =
new AggregatedLogDeletionService.LogDeletionTask(conf, retentionSeconds, rmClient);
deletionTask.run();
rmClient = createMockRMClient(finishedApps, runningApps);
List<LogDeletionTask> tasks = deletionService.createLogDeletionTasks(conf, retentionSeconds,
rmClient);
for (LogDeletionTask deletionTask : tasks) {
deletionTask.run();
}
return this;
}
@ -359,8 +370,20 @@ public class LogAggregationTestcase {
verify(mockFs, timeout(timeout).times(times)).delete(file.path, true);
}
public void teardown() {
private void verifyMockRmClientWasClosedNTimes(int expectedRmClientCloses)
throws IOException {
ApplicationClientProtocol mockRMClient;
if (deletionService != null) {
mockRMClient = deletionService.getMockRMClient();
} else {
mockRMClient = rmClient;
}
verify((Closeable)mockRMClient, times(expectedRmClientCloses)).close();
}
public void teardown(int expectedRmClientCloses) throws IOException {
deletionService.stop();
verifyMockRmClientWasClosedNTimes(expectedRmClientCloses);
}
public LogAggregationTestcase refreshLogRetentionSettings() throws IOException {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.logaggregation.testutils;
import org.apache.hadoop.test.MockitoUtil;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@ -34,7 +35,8 @@ public class MockRMClientUtils {
public static ApplicationClientProtocol createMockRMClient(
List<ApplicationId> finishedApplications,
List<ApplicationId> runningApplications) throws Exception {
final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);
final ApplicationClientProtocol mockProtocol =
MockitoUtil.mockProtocol(ApplicationClientProtocol.class);
if (finishedApplications != null && !finishedApplications.isEmpty()) {
for (ApplicationId appId : finishedApplications) {
GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);