YARN-7495. Improve robustness of the AggregatedLogDeletionService. Contributed by Jonathan Eagles

(cherry picked from commit 5cfaee2e6d)
This commit is contained in:
Jason Lowe 2017-11-30 12:39:18 -06:00
parent 40372be7bb
commit f072caa0bc
2 changed files with 122 additions and 36 deletions

View File

@ -85,49 +85,67 @@ public class AggregatedLogDeletionService extends AbstractService {
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient); deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
} }
} }
} catch (IOException e) { } catch (Throwable t) {
logIOException("Error reading root log dir this deletion " + logException("Error reading root log dir this deletion " +
"attempt is being aborted", e); "attempt is being aborted", t);
} }
LOG.info("aggregated log deletion finished."); LOG.info("aggregated log deletion finished.");
} }
private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis,
FileSystem fs, ApplicationClientProtocol rmClient) { FileSystem fs, ApplicationClientProtocol rmClient) {
FileStatus[] appDirs;
try { try {
for(FileStatus appDir : fs.listStatus(dir)) { appDirs = fs.listStatus(dir);
if(appDir.isDirectory() && } catch (IOException e) {
appDir.getModificationTime() < cutoffMillis) { logException("Could not read the contents of " + dir, e);
boolean appTerminated = return;
isApplicationTerminated(ApplicationId.fromString(appDir }
.getPath().getName()), rmClient); for (FileStatus appDir : appDirs) {
if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) { deleteAppDirLogs(cutoffMillis, fs, rmClient, appDir);
try { }
LOG.info("Deleting aggregated logs in "+appDir.getPath()); }
fs.delete(appDir.getPath(), true);
} catch (IOException e) { private static void deleteAppDirLogs(long cutoffMillis, FileSystem fs,
logIOException("Could not delete "+appDir.getPath(), e); ApplicationClientProtocol rmClient,
} FileStatus appDir) {
} else if (!appTerminated){ try {
try { if (appDir.isDirectory() &&
for(FileStatus node: fs.listStatus(appDir.getPath())) { appDir.getModificationTime() < cutoffMillis) {
if(node.getModificationTime() < cutoffMillis) { ApplicationId appId = ApplicationId.fromString(
try { appDir.getPath().getName());
fs.delete(node.getPath(), true); boolean appTerminated = isApplicationTerminated(appId, rmClient);
} catch (IOException ex) { if (!appTerminated) {
logIOException("Could not delete "+appDir.getPath(), ex); // Application is still running
} FileStatus[] logFiles;
} try {
logFiles = fs.listStatus(appDir.getPath());
} catch (IOException e) {
logException("Error reading the contents of "
+ appDir.getPath(), e);
return;
}
for (FileStatus node : logFiles) {
if (node.getModificationTime() < cutoffMillis) {
try {
fs.delete(node.getPath(), true);
} catch (IOException ex) {
logException("Could not delete " + appDir.getPath(), ex);
} }
} catch(IOException e) {
logIOException(
"Error reading the contents of " + appDir.getPath(), e);
} }
} }
} else if (shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
// Application is no longer running
try {
LOG.info("Deleting aggregated logs in " + appDir.getPath());
fs.delete(appDir.getPath(), true);
} catch (IOException e) {
logException("Could not delete " + appDir.getPath(), e);
}
} }
} }
} catch (IOException e) { } catch (Exception e) {
logIOException("Could not read the contents of " + dir, e); logException("Could not delete " + appDir.getPath(), e);
} }
} }
@ -142,7 +160,7 @@ public class AggregatedLogDeletionService extends AbstractService {
} }
} }
} catch(IOException e) { } catch(IOException e) {
logIOException("Error reading the contents of " + dir.getPath(), e); logException("Error reading the contents of " + dir.getPath(), e);
shouldDelete = false; shouldDelete = false;
} }
return shouldDelete; return shouldDelete;
@ -172,14 +190,14 @@ public class AggregatedLogDeletionService extends AbstractService {
} }
} }
private static void logIOException(String comment, IOException e) { private static void logException(String comment, Throwable t) {
if(e instanceof AccessControlException) { if(t instanceof AccessControlException) {
String message = e.getMessage(); String message = t.getMessage();
//TODO fix this after HADOOP-8661 //TODO fix this after HADOOP-8661
message = message.split("\n")[0]; message = message.split("\n")[0];
LOG.warn(comment + " " + message); LOG.warn(comment + " " + message);
} else { } else {
LOG.error(comment, e); LOG.error(comment, t);
} }
} }

View File

@ -385,6 +385,74 @@ public class TestAggregatedLogDeletionService {
deletionSvc.stop(); deletionSvc.stop();
} }
@Test
public void testRobustLogDeletion() throws Exception {
final long RETENTION_SECS = 10 * 24 * 3600;
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class,
FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
"1");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
// prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll();
Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
Path remoteRootLogPath = new Path(remoteRootLogDir);
Path userDir = new Path(remoteRootLogPath, "me");
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
Path userLogDir = new Path(userDir, suffix);
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
Path app2Dir = new Path(userLogDir, "application_a");
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
ApplicationId appId3 =
ApplicationId.newInstance(System.currentTimeMillis(), 3);
Path app3Dir = new Path(userLogDir, appId3.toString());
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
when(mockFs.listStatus(app1Dir)).thenThrow(
new RuntimeException("Should Be Caught and Logged"));
Path app3Log3 = new Path(app3Dir, "host1");
FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
when(mockFs.listStatus(app3Dir)).thenReturn(
new FileStatus[]{app3Log3Status});
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId3));
ApplicationClientProtocol rmClient =
createMockRMClient(finishedApplications, null);
AggregatedLogDeletionService.LogDeletionTask deletionTask =
new AggregatedLogDeletionService.LogDeletionTask(conf,
RETENTION_SECS,
rmClient);
deletionTask.run();
verify(mockFs).delete(app3Dir, true);
}
static class MockFileSystem extends FilterFileSystem { static class MockFileSystem extends FilterFileSystem {
MockFileSystem() { MockFileSystem() {
super(mock(FileSystem.class)); super(mock(FileSystem.class));