YARN-9080. Added clean up of bucket directories.

Contributed by Prabhu Joseph, Peter Bacsko, Szilard Nemeth
This commit is contained in:
Eric Yang 2019-05-23 12:08:44 -04:00
parent ea0b1d8fba
commit 7b03072fd4
2 changed files with 100 additions and 18 deletions

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -456,43 +458,76 @@ public class EntityGroupFSTimelineStore extends CompositeService
* dirpath should be a directory that contains a set of * dirpath should be a directory that contains a set of
* application log directories. The cleaner method will not * application log directories. The cleaner method will not
* work if the given dirpath itself is an application log dir. * work if the given dirpath itself is an application log dir.
* @param fs
* @param retainMillis * @param retainMillis
* @throws IOException * @throws IOException
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@VisibleForTesting @VisibleForTesting
void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) void cleanLogs(Path dirpath, long retainMillis)
throws IOException { throws IOException {
long now = Time.now(); long now = Time.now();
RemoteIterator<FileStatus> iter = list(dirpath);
while (iter.hasNext()) {
FileStatus stat = iter.next();
Path clusterTimeStampPath = stat.getPath();
if (isValidClusterTimeStampDir(clusterTimeStampPath)) {
MutableBoolean appLogDirPresent = new MutableBoolean(false);
cleanAppLogDir(clusterTimeStampPath, retainMillis, appLogDirPresent);
if (appLogDirPresent.isFalse() &&
(now - stat.getModificationTime() > retainMillis)) {
deleteDir(clusterTimeStampPath);
}
}
}
}
private void cleanAppLogDir(Path dirpath, long retainMillis,
MutableBoolean appLogDirPresent) throws IOException {
long now = Time.now();
// Depth first search from root directory for all application log dirs // Depth first search from root directory for all application log dirs
RemoteIterator<FileStatus> iter = list(dirpath); RemoteIterator<FileStatus> iter = list(dirpath);
while (iter.hasNext()) { while (iter.hasNext()) {
FileStatus stat = iter.next(); FileStatus stat = iter.next();
Path childPath = stat.getPath();
if (stat.isDirectory()) { if (stat.isDirectory()) {
// If current is an application log dir, decide if we need to remove it // If current is an application log dir, decide if we need to remove it
// and remove if necessary. // and remove if necessary.
// Otherwise, keep iterating into it. // Otherwise, keep iterating into it.
ApplicationId appId = parseApplicationId(dirpath.getName()); ApplicationId appId = parseApplicationId(childPath.getName());
if (appId != null) { // Application log dir if (appId != null) { // Application log dir
if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) { appLogDirPresent.setTrue();
try { if (shouldCleanAppLogDir(childPath, now, fs, retainMillis)) {
LOG.info("Deleting {}", dirpath); deleteDir(childPath);
if (!fs.delete(dirpath, true)) {
LOG.error("Unable to remove " + dirpath);
}
metrics.incrLogsDirsCleaned();
} catch (IOException e) {
LOG.error("Unable to remove " + dirpath, e);
}
} }
} else { // Keep cleaning inside } else { // Keep cleaning inside
cleanLogs(stat.getPath(), fs, retainMillis); cleanAppLogDir(childPath, retainMillis, appLogDirPresent);
} }
} }
} }
} }
private void deleteDir(Path path) {
try {
LOG.info("Deleting {}", path);
if (fs.delete(path, true)) {
metrics.incrLogsDirsCleaned();
} else {
LOG.error("Unable to remove {}", path);
}
} catch (IOException e) {
LOG.error("Unable to remove {}", path, e);
}
}
private boolean isValidClusterTimeStampDir(Path clusterTimeStampPath)
throws IOException {
FileStatus stat = fs.getFileStatus(clusterTimeStampPath);
return stat.isDirectory() &&
StringUtils.isNumeric(clusterTimeStampPath.getName());
}
private static boolean shouldCleanAppLogDir(Path appLogPath, long now, private static boolean shouldCleanAppLogDir(Path appLogPath, long now,
FileSystem fs, long logRetainMillis) throws IOException { FileSystem fs, long logRetainMillis) throws IOException {
RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath); RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath);
@ -908,7 +943,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
LOG.debug("Cleaner starting"); LOG.debug("Cleaner starting");
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
try { try {
cleanLogs(doneRootPath, fs, logRetainMillis); cleanLogs(doneRootPath, logRetainMillis);
} catch (Exception e) { } catch (Exception e) {
Throwable t = extract(e); Throwable t = extract(e);
if (t instanceof InterruptedException) { if (t instanceof InterruptedException) {

View File

@ -268,7 +268,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
Path irrelevantDirPath = new Path(testDoneDirPath, "irrelevant"); Path irrelevantDirPath = new Path(testDoneDirPath, "irrelevant");
fs.mkdirs(irrelevantDirPath); fs.mkdirs(irrelevantDirPath);
Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001"); Path doneAppHomeDir = new Path(new Path(new Path(testDoneDirPath,
Long.toString(mainTestAppId.getClusterTimestamp())), "0000"), "001");
// First application, untouched after creation // First application, untouched after creation
Path appDirClean = new Path(doneAppHomeDir, appDirName); Path appDirClean = new Path(doneAppHomeDir, appDirName);
Path attemptDirClean = new Path(appDirClean, attemptDirName); Path attemptDirClean = new Path(appDirClean, attemptDirName);
@ -300,7 +301,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
// Should retain all logs after this run // Should retain all logs after this run
MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned(); MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned();
long before = dirsCleaned.value(); long before = dirsCleaned.value();
store.cleanLogs(testDoneDirPath, fs, 10000); store.cleanLogs(testDoneDirPath, 10000);
assertTrue(fs.exists(irrelevantDirPath)); assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath)); assertTrue(fs.exists(irrelevantFilePath));
assertTrue(fs.exists(filePath)); assertTrue(fs.exists(filePath));
@ -317,7 +318,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
// Touch the third application by creating a new dir // Touch the third application by creating a new dir
fs.mkdirs(new Path(dirPathHold, "holdByMe")); fs.mkdirs(new Path(dirPathHold, "holdByMe"));
store.cleanLogs(testDoneDirPath, fs, 1000); store.cleanLogs(testDoneDirPath, 1000);
// Verification after the second cleaner call // Verification after the second cleaner call
assertTrue(fs.exists(irrelevantDirPath)); assertTrue(fs.exists(irrelevantDirPath));
@ -332,6 +333,52 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
assertEquals(before + 2L, dirsCleaned.value()); assertEquals(before + 2L, dirsCleaned.value());
} }
@Test
public void testCleanBuckets() throws Exception {
// ClusterTimeStampDir with App Log Dirs
Path clusterTimeStampDir1 = new Path(testDoneDirPath,
Long.toString(sampleAppIds.get(0).getClusterTimestamp()));
Path appDir1 = new Path(new Path(new Path(
clusterTimeStampDir1, "0000"), "000"), sampleAppIds.get(0).toString());
Path appDir2 = new Path(new Path(new Path(
clusterTimeStampDir1, "0000"), "001"), sampleAppIds.get(1).toString());
Path appDir3 = new Path(new Path(new Path(
clusterTimeStampDir1, "0000"), "002"), sampleAppIds.get(2).toString());
Path appDir4 = new Path(new Path(new Path(
clusterTimeStampDir1, "0001"), "000"), sampleAppIds.get(3).toString());
// ClusterTimeStampDir with no App Log Dirs
Path clusterTimeStampDir2 = new Path(testDoneDirPath, "1235");
// Irrevelant ClusterTimeStampDir
Path clusterTimeStampDir3 = new Path(testDoneDirPath, "irrevelant");
Path appDir5 = new Path(new Path(new Path(
clusterTimeStampDir3, "0000"), "000"), sampleAppIds.get(4).toString());
fs.mkdirs(appDir1);
fs.mkdirs(appDir2);
fs.mkdirs(appDir3);
fs.mkdirs(appDir4);
fs.mkdirs(clusterTimeStampDir2);
fs.mkdirs(appDir5);
Thread.sleep(2000);
store.cleanLogs(testDoneDirPath, 1000);
// ClusterTimeStampDir will be removed only if no App Log Dir Present
assertTrue(fs.exists(clusterTimeStampDir1));
assertFalse(fs.exists(appDir1));
assertFalse(fs.exists(appDir2));
assertFalse(fs.exists(appDir3));
assertFalse(fs.exists(appDir4));
assertFalse(fs.exists(clusterTimeStampDir2));
assertTrue(fs.exists(appDir5));
store.cleanLogs(testDoneDirPath, 1000);
assertFalse(fs.exists(clusterTimeStampDir1));
}
@Test @Test
public void testPluginRead() throws Exception { public void testPluginRead() throws Exception {
// Verify precondition // Verify precondition