HDFS-8178. QJM doesn't move aside stale inprogress edits files. Contributed by Istvan Fajth.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Istvan Fajth 2019-08-29 14:55:17 -07:00 committed by Wei-Chiu Chuang
parent 3b22fcd377
commit fcb7884bfc
4 changed files with 180 additions and 67 deletions

View File

@ -52,11 +52,6 @@ class JNStorage extends Storage {
private final StorageDirectory sd;
private StorageState state;
private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
ImmutableList.of(
Pattern.compile("edits_\\d+-(\\d+)"),
Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?"));
private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
ImmutableList.of(Pattern.compile("(\\d+)"));
@ -181,8 +176,8 @@ File getRoot() {
* the given txid.
*/
void purgeDataOlderThan(long minTxIdToKeep) throws IOException {
purgeMatching(sd.getCurrentDir(),
CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep);
fjm.purgeLogsOlderThan(minTxIdToKeep);
purgeMatching(getOrCreatePaxosDir(),
PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep);
}

View File

@ -75,7 +75,8 @@ public class FileJournalManager implements JournalManager {
private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)");
private File currentInProgress = null;
@VisibleForTesting
File currentInProgress = null;
/**
* A FileJournalManager should maintain the largest Tx ID that has been
@ -178,19 +179,49 @@ public void setLastReadableTxId(long id) {
this.lastReadableTxId = id;
}
/**
* Purges the unnecessary edits and edits_inprogress files.
*
* Edits files that are ending before the minTxIdToKeep are purged.
* Edits in progress files that are starting before minTxIdToKeep are purged.
* Edits in progress files that are marked as empty, trash, corrupted or
* stale by file extension and starting before minTxIdToKeep are purged.
* Edits in progress files that are after minTxIdToKeep, but before the
* current edits in progress files are marked as stale for clarity.
*
* In case file removal or rename is failing a warning is logged, but that
* does not fail the operation.
*
* @param minTxIdToKeep the lowest transaction ID that should be retained
* @throws IOException if listing the storage directory fails.
*/
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
LOG.info("Purging logs older than " + minTxIdToKeep);
File[] files = FileUtil.listFiles(sd.getCurrentDir());
List<EditLogFile> editLogs = matchEditLogs(files, true);
synchronized (this) {
for (EditLogFile log : editLogs) {
if (log.getFirstTxId() < minTxIdToKeep &&
log.getLastTxId() < minTxIdToKeep) {
purger.purgeLog(log);
} else if (isStaleInProgressLog(minTxIdToKeep, log)) {
purger.markStale(log);
}
}
}
}
private boolean isStaleInProgressLog(long minTxIdToKeep, EditLogFile log) {
return log.isInProgress() &&
!log.getFile().equals(currentInProgress) &&
log.getFirstTxId() >= minTxIdToKeep &&
// at last we check if this segment is not already marked as .trash,
// .empty or .corrupted, in which case it does not match the strict
// regex pattern.
EDITS_INPROGRESS_REGEX.matcher(log.getFile().getName()).matches();
}
/**
* Find all editlog segments starting at or above the given txid.
@ -597,6 +628,11 @@ public void moveAsideEmptyFile() throws IOException {
renameSelf(".empty");
}
public void moveAsideStaleInprogressFile() throws IOException {
assert isInProgress;
renameSelf(".stale");
}
private void renameSelf(String newSuffix) throws IOException {
File src = file;
File dst = new File(src.getParent(), src.getName() + newSuffix);

View File

@ -207,21 +207,22 @@ private long getImageTxIdToRetain(
/**
* Interface responsible for disposing of old checkpoints and edit logs.
*/
static interface StoragePurger {
interface StoragePurger {
void purgeLog(EditLogFile log);
void purgeImage(FSImageFile image);
void markStale(EditLogFile log);
}
static class DeletionStoragePurger implements StoragePurger {
@Override
public void purgeLog(EditLogFile log) {
LOG.info("Purging old edit log " + log);
LOG.info("Purging old edit log {}", log);
deleteOrWarn(log.getFile());
}
@Override
public void purgeImage(FSImageFile image) {
LOG.info("Purging old image " + image);
LOG.info("Purging old image {}", image);
deleteOrWarn(image.getFile());
deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
}
@ -230,7 +231,17 @@ private static void deleteOrWarn(File file) {
if (!file.delete()) {
// It's OK if we fail to delete something -- we'll catch it
// next time we swing through this directory.
LOG.warn("Could not delete " + file);
LOG.warn("Could not delete {}", file);
}
}
public void markStale(EditLogFile log){
try {
log.moveAsideStaleInprogressFile();
} catch (IOException e) {
// It is ok to just log the rename failure and go on, we will try next
// time just as with deletions.
LOG.warn("Could not mark {} as stale", log, e);
}
}
}

View File

@ -25,12 +25,17 @@
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
@ -215,6 +220,8 @@ public void testRetainExtraLogsLimitedSegments() throws IOException {
// Segments containing txns upto txId 250 are extra and should be purged.
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(101) + ".trash",
true);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(176) + ".empty",
true);
@ -226,6 +233,8 @@ public void testRetainExtraLogsLimitedSegments() throws IOException {
// Only retain 2 extra segments. The 301-350 and 351-400 segments are
// considered required, not extra.
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(276) + ".trash",
false);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(301) + ".empty",
false);
@ -237,6 +246,43 @@ public void testRetainExtraLogsLimitedSegments() throws IOException {
runTest(tc);
}
/* We are checking here the JournalNode environment hence added the paxos
* directory, but as the test here is about the FileJournalManager it happens
* via the NNStorageRetentionManager and that needs the fsImage files as well
* to be present in the folder to calculate the minimum transaction id we want
* to keep based on the config.
*/
@Test
public void testExtraInprogressFilesAreRemovedOrMarkedStale()
throws IOException {
conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 150);
TestCaseDescription tc = new TestCaseDescription();
tc.addRoot("/foo", NameNodeDirType.IMAGE_AND_EDITS);
final String PATH = "/foo/current/";
tc.addImage(PATH + getImageFileName(200), true);
tc.addImage(PATH + getImageFileName(300), false);
tc.addImage(PATH + getImageFileName(400), false);
File file = Mockito.spy(new File(PATH + "paxos"));
Mockito.when(file.isDirectory()).thenReturn(true);
tc.addFile(file);
tc.addLog(PATH + getFinalizedEditsFileName(1,75), true);
tc.addLog(PATH + getInProgressEditsFileName(76), true);
tc.addLog(PATH + getFinalizedEditsFileName(76, 120), true);
tc.addLog(PATH + getInProgressEditsFileName(121) + ".stale", true);
tc.addLog(PATH + getFinalizedEditsFileName(121, 150), true);
// everything down from here should be kept.
tc.addLog(PATH + getInProgressEditsFileName(151), false, true);
tc.addLog(PATH + getFinalizedEditsFileName(151, 320), false);
tc.addLog(PATH + getInProgressEditsFileName(321), false, true);
tc.addLog(PATH + getFinalizedEditsFileName(321, 430), false);
tc.addLog(PATH + getInProgressEditsFileName(431), false);
runTest(tc);
}
private void runTest(TestCaseDescription tc) throws IOException {
StoragePurger mockPurger =
Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
@ -244,6 +290,8 @@ private void runTest(TestCaseDescription tc) throws IOException {
ArgumentCaptor.forClass(FSImageFile.class);
ArgumentCaptor<EditLogFile> logsPurgedCaptor =
ArgumentCaptor.forClass(EditLogFile.class);
ArgumentCaptor<EditLogFile> staleLogsCaptor =
ArgumentCaptor.forClass(EditLogFile.class);
// Ask the manager to purge files we don't need any more
new NNStorageRetentionManager(conf,
@ -255,30 +303,42 @@ private void runTest(TestCaseDescription tc) throws IOException {
.purgeImage(imagesPurgedCaptor.capture());
Mockito.verify(mockPurger, Mockito.atLeast(0))
.purgeLog(logsPurgedCaptor.capture());
Mockito.verify(mockPurger, Mockito.atLeast(0))
.markStale(staleLogsCaptor.capture());
Set<String> capturedPaths = Sets.newLinkedHashSet();
// Check images
Set<String> purgedPaths = Sets.newLinkedHashSet();
for (FSImageFile purged : imagesPurgedCaptor.getAllValues()) {
purgedPaths.add(fileToPath(purged.getFile()));
for (FSImageFile captured : imagesPurgedCaptor.getAllValues()) {
capturedPaths.add(fileToPath(captured.getFile()));
}
Assert.assertEquals(
Assert.assertEquals("Image file check.",
Joiner.on(",").join(filesToPaths(tc.expectedPurgedImages)),
Joiner.on(",").join(purgedPaths));
Joiner.on(",").join(capturedPaths));
// Check images
purgedPaths.clear();
for (EditLogFile purged : logsPurgedCaptor.getAllValues()) {
purgedPaths.add(fileToPath(purged.getFile()));
capturedPaths.clear();
// Check edit logs, and also in progress edits older than minTxIdToKeep
for (EditLogFile captured : logsPurgedCaptor.getAllValues()) {
capturedPaths.add(fileToPath(captured.getFile()));
}
Assert.assertEquals(
Assert.assertEquals("Check old edits are removed.",
Joiner.on(",").join(filesToPaths(tc.expectedPurgedLogs)),
Joiner.on(",").join(purgedPaths));
Joiner.on(",").join(capturedPaths));
capturedPaths.clear();
// Check in progress edits to keep are marked as stale
for (EditLogFile captured : staleLogsCaptor.getAllValues()) {
capturedPaths.add(fileToPath(captured.getFile()));
}
Assert.assertEquals("Check unnecessary but kept edits are marked stale",
Joiner.on(",").join(filesToPaths(tc.expectedStaleLogs)),
Joiner.on(",").join(capturedPaths));
}
private class TestCaseDescription {
private final Map<File, FakeRoot> dirRoots = Maps.newLinkedHashMap();
private final Set<File> expectedPurgedLogs = Sets.newLinkedHashSet();
private final Set<File> expectedPurgedImages = Sets.newLinkedHashSet();
private final Set<File> expectedStaleLogs = Sets.newLinkedHashSet();
private class FakeRoot {
final NameNodeDirType type;
@ -309,11 +369,18 @@ private void addFile(File file) {
}
void addLog(String path, boolean expectPurge) {
addLog(path, expectPurge, false);
}
void addLog(String path, boolean expectPurge, boolean expectStale) {
File file = new File(path);
addFile(file);
if (expectPurge) {
expectedPurgedLogs.add(file);
}
if (expectStale) {
expectedStaleLogs.add(file);
}
}
void addImage(String path, boolean expectPurge) {
@ -332,6 +399,21 @@ NNStorage mockStorage() throws IOException {
return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
}
private File findLastInProgressEdit(FakeRoot root){
Pattern p = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
ToLongFunction<File> fileNameToTxId =
f -> {
Matcher m = p.matcher(f.getName());
return m.matches() ?
Long.parseLong(m.group(1)):
HdfsServerConstants.INVALID_TXID;
};
return root.files.stream().
sorted(Comparator.comparingLong(fileNameToTxId).reversed()).
findFirst().orElse(null);
}
@SuppressWarnings("unchecked")
public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
final List<JournalManager> jms = Lists.newArrayList();
@ -342,36 +424,28 @@ public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
// passing null NNStorage for unit test because it does not use it
FileJournalManager fjm = new FileJournalManager(conf,
root.mockStorageDir(), null);
fjm.currentInProgress = findLastInProgressEdit(root);
fjm.purger = purger;
jms.add(fjm);
journalSet.add(fjm, false);
}
FSEditLog mockLog = Mockito.mock(FSEditLog.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Mockito.doAnswer(invocation -> {
Object[] args = invocation.getArguments();
assert args.length == 1;
long txId = (Long) args[0];
for (JournalManager jm : jms) {
jm.purgeLogsOlderThan(txId);
}
return null;
}
}).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Mockito.doAnswer(invocation -> {
Object[] args = invocation.getArguments();
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
(Long)args[1], (Boolean)args[2], (Boolean)args[3]);
return null;
}
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
return mockLog;
@ -402,19 +476,16 @@ private static Collection<String> filesToPaths(Collection<File> files) {
return paths;
}
private static NNStorage mockStorageForDirs(final StorageDirectory ... mockDirs)
private static NNStorage mockStorageForDirs(final StorageDirectory... mockDirs)
throws IOException {
NNStorage mockStorage = Mockito.mock(NNStorage.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Mockito.doAnswer(invocation -> {
FSImageStorageInspector inspector =
(FSImageStorageInspector) invocation.getArguments()[0];
for (StorageDirectory sd : mockDirs) {
inspector.inspectDirectory(sd);
}
return null;
}
}).when(mockStorage).inspectStorageDirs(any());
return mockStorage;
}