YARN-11241. Add uncleaning option for local app log file with log-aggregation enabled (#4703)
Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com> Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
cde1f3af21
commit
65a027b112
|
@ -1552,6 +1552,13 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
|
||||
= 10 * 60 * 1000;
|
||||
|
||||
/**
|
||||
* Whether to clean up nodemanager logs when log aggregation is enabled.
|
||||
*/
|
||||
public static final String LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP =
|
||||
YARN_PREFIX + "log-aggregation.enable-local-cleanup";
|
||||
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP = true;
|
||||
|
||||
/**
|
||||
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
|
||||
* aggregation is disabled
|
||||
|
|
|
@ -1595,6 +1595,15 @@
|
|||
<value>600000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Whether to clean up nodemanager logs when log aggregation is enabled. Setting to
|
||||
false disables the cleanup nodemanager logging, and it causes disk full in the long run. Users
|
||||
can set to false for test-only purpose.
|
||||
</description>
|
||||
<name>yarn.log-aggregation.enable-local-cleanup</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Time in seconds to retain user logs. Only applicable if
|
||||
log aggregation is disabled
|
||||
|
|
|
@ -86,6 +86,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
private final Dispatcher dispatcher;
|
||||
private final ApplicationId appId;
|
||||
private final String applicationId;
|
||||
private final boolean enableLocalCleanup;
|
||||
private boolean logAggregationDisabled = false;
|
||||
private final Configuration conf;
|
||||
private final DeletionService delService;
|
||||
|
@ -172,6 +173,13 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
this.logAggregationContext = logAggregationContext;
|
||||
this.context = context;
|
||||
this.nodeId = nodeId;
|
||||
this.enableLocalCleanup =
|
||||
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
|
||||
if (!this.enableLocalCleanup) {
|
||||
LOG.warn("{} is only for testing and not for any production system ",
|
||||
YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
|
||||
}
|
||||
this.logAggPolicy = getLogAggPolicy(conf);
|
||||
this.recoveredLogInitedTime = recoveredLogInitedTime;
|
||||
this.logFileSizeThreshold =
|
||||
|
@ -337,26 +345,26 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
appFinished, finishedContainers.contains(container));
|
||||
if (uploadedFilePathsInThisCycle.size() > 0) {
|
||||
uploadedLogsInThisCycle = true;
|
||||
LOG.trace("Uploaded the following files for {}: {}",
|
||||
container, uploadedFilePathsInThisCycle.toString());
|
||||
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
|
||||
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
|
||||
try {
|
||||
long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
|
||||
if (fileSize >= logFileSizeThreshold) {
|
||||
LOG.debug("Log File " + uploadedFilePath
|
||||
+ " size is " + fileSize + " bytes");
|
||||
if (enableLocalCleanup) {
|
||||
LOG.trace("Uploaded the following files for {}: {}", container,
|
||||
uploadedFilePathsInThisCycle.toString());
|
||||
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
|
||||
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
|
||||
try {
|
||||
long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
|
||||
if (fileSize >= logFileSizeThreshold) {
|
||||
LOG.debug("Log File " + uploadedFilePath + " size is " + fileSize + " bytes");
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
LOG.error("Failed to get log file size " + e1);
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
LOG.error("Failed to get log file size " + e1);
|
||||
}
|
||||
}
|
||||
deletionTask = new FileDeletionTask(delService, this.userUgi.getShortUserName(), null,
|
||||
uploadedFilePathsInThisCycleList);
|
||||
}
|
||||
deletionTask = new FileDeletionTask(delService,
|
||||
this.userUgi.getShortUserName(), null,
|
||||
uploadedFilePathsInThisCycleList);
|
||||
}
|
||||
|
||||
// This container is finished, and all its logs have been uploaded,
|
||||
|
@ -528,6 +536,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
}
|
||||
|
||||
private void doAppLogAggregationPostCleanUp() {
|
||||
if (!enableLocalCleanup) {
|
||||
return;
|
||||
}
|
||||
// Remove the local app-log-dirs
|
||||
List<Path> localAppLogDirs = new ArrayList<Path>();
|
||||
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
|
||||
|
|
|
@ -234,31 +234,47 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
// ensure filesystems were closed
|
||||
verify(logAggregationService).closeFileSystems(
|
||||
any(UserGroupInformation.class));
|
||||
List<Path> dirList = new ArrayList<>();
|
||||
dirList.add(new Path(app1LogDir.toURI()));
|
||||
verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
|
||||
delSrvc, user, null, dirList)));
|
||||
|
||||
String containerIdStr = container11.toString();
|
||||
File containerLogDir = new File(app1LogDir, containerIdStr);
|
||||
int count = 0;
|
||||
int maxAttempts = 50;
|
||||
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
|
||||
File f = new File(containerLogDir, fileType);
|
||||
count = 0;
|
||||
while ((f.exists()) && (count < maxAttempts)) {
|
||||
count++;
|
||||
Thread.sleep(100);
|
||||
boolean filesShouldBeDeleted =
|
||||
this.conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
|
||||
if (filesShouldBeDeleted) {
|
||||
List<Path> dirList = new ArrayList<>();
|
||||
dirList.add(new Path(app1LogDir.toURI()));
|
||||
verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
|
||||
delSrvc, user, null, dirList)));
|
||||
|
||||
String containerIdStr = container11.toString();
|
||||
File containerLogDir = new File(app1LogDir, containerIdStr);
|
||||
int count = 0;
|
||||
int maxAttempts = 50;
|
||||
for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
|
||||
File f = new File(containerLogDir, fileType);
|
||||
count = 0;
|
||||
while ((f.exists()) && (count < maxAttempts)) {
|
||||
count++;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
|
||||
}
|
||||
Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
|
||||
Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
|
||||
app1LogDir.exists());
|
||||
} else {
|
||||
List<Path> dirList = new ArrayList<>();
|
||||
dirList.add(new Path(app1LogDir.toURI()));
|
||||
verify(delSrvc, never()).delete(argThat(new FileDeletionMatcher(
|
||||
delSrvc, user, null, dirList)));
|
||||
|
||||
String containerIdStr = container11.toString();
|
||||
File containerLogDir = new File(app1LogDir, containerIdStr);
|
||||
Thread.sleep(5000);
|
||||
for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
|
||||
File f = new File(containerLogDir, fileType);
|
||||
Assert.assertTrue("File [" + f + "] was not deleted", f.exists());
|
||||
}
|
||||
Assert.assertTrue("Directory [" + app1LogDir + "] was not deleted",
|
||||
app1LogDir.exists());
|
||||
}
|
||||
count = 0;
|
||||
while ((app1LogDir.exists()) && (count < maxAttempts)) {
|
||||
count++;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
|
||||
app1LogDir.exists());
|
||||
delSrvc.stop();
|
||||
|
||||
Path logFilePath = logAggregationService
|
||||
.getLogAggregationFileController(conf)
|
||||
|
@ -297,6 +313,20 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
verifyLocalFileDeletion(logAggregationService);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileRemainsAfterUploadOnCleanupDisable() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
delSrvc = spy(delSrvc);
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP, false);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler));
|
||||
verifyLocalFileDeletion(logAggregationService);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileDeletionOnDiskFull() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
|
|
Loading…
Reference in New Issue