HDFS-8178. QJM doesn't move aside stale inprogress edits files. Contributed by Istvan Fajth.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
7e9d324bcd
commit
e3c01e174c
|
@ -50,12 +50,7 @@ class JNStorage extends Storage {
|
|||
private final StorageDirectory sd;
|
||||
private StorageState state;
|
||||
|
||||
private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
|
||||
ImmutableList.of(
|
||||
Pattern.compile("edits_\\d+-(\\d+)"),
|
||||
Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?"));
|
||||
|
||||
private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
|
||||
private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
|
||||
ImmutableList.of(Pattern.compile("(\\d+)"));
|
||||
|
||||
private static final String STORAGE_EDITS_SYNC = "edits.sync";
|
||||
|
@ -177,8 +172,8 @@ class JNStorage extends Storage {
|
|||
* the given txid.
|
||||
*/
|
||||
void purgeDataOlderThan(long minTxIdToKeep) throws IOException {
|
||||
purgeMatching(sd.getCurrentDir(),
|
||||
CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep);
|
||||
fjm.purgeLogsOlderThan(minTxIdToKeep);
|
||||
|
||||
purgeMatching(getOrCreatePaxosDir(),
|
||||
PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep);
|
||||
}
|
||||
|
|
|
@ -74,7 +74,8 @@ public class FileJournalManager implements JournalManager {
|
|||
private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile(
|
||||
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)");
|
||||
|
||||
private File currentInProgress = null;
|
||||
@VisibleForTesting
|
||||
File currentInProgress = null;
|
||||
|
||||
/**
|
||||
* A FileJournalManager should maintain the largest Tx ID that has been
|
||||
|
@ -177,20 +178,50 @@ public class FileJournalManager implements JournalManager {
|
|||
this.lastReadableTxId = id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Purges the unnecessary edits and edits_inprogress files.
|
||||
*
|
||||
* Edits files that are ending before the minTxIdToKeep are purged.
|
||||
* Edits in progress files that are starting before minTxIdToKeep are purged.
|
||||
* Edits in progress files that are marked as empty, trash, corrupted or
|
||||
* stale by file extension and starting before minTxIdToKeep are purged.
|
||||
* Edits in progress files that are after minTxIdToKeep, but before the
|
||||
* current edits in progress files are marked as stale for clarity.
|
||||
*
|
||||
* In case file removal or rename is failing a warning is logged, but that
|
||||
* does not fail the operation.
|
||||
*
|
||||
* @param minTxIdToKeep the lowest transaction ID that should be retained
|
||||
* @throws IOException if listing the storage directory fails.
|
||||
*/
|
||||
@Override
|
||||
public void purgeLogsOlderThan(long minTxIdToKeep)
|
||||
throws IOException {
|
||||
LOG.info("Purging logs older than " + minTxIdToKeep);
|
||||
File[] files = FileUtil.listFiles(sd.getCurrentDir());
|
||||
List<EditLogFile> editLogs = matchEditLogs(files, true);
|
||||
for (EditLogFile log : editLogs) {
|
||||
if (log.getFirstTxId() < minTxIdToKeep &&
|
||||
log.getLastTxId() < minTxIdToKeep) {
|
||||
purger.purgeLog(log);
|
||||
synchronized (this) {
|
||||
for (EditLogFile log : editLogs) {
|
||||
if (log.getFirstTxId() < minTxIdToKeep &&
|
||||
log.getLastTxId() < minTxIdToKeep) {
|
||||
purger.purgeLog(log);
|
||||
} else if (isStaleInProgressLog(minTxIdToKeep, log)) {
|
||||
purger.markStale(log);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isStaleInProgressLog(long minTxIdToKeep, EditLogFile log) {
|
||||
return log.isInProgress() &&
|
||||
!log.getFile().equals(currentInProgress) &&
|
||||
log.getFirstTxId() >= minTxIdToKeep &&
|
||||
// at last we check if this segment is not already marked as .trash,
|
||||
// .empty or .corrupted, in which case it does not match the strict
|
||||
// regex pattern.
|
||||
EDITS_INPROGRESS_REGEX.matcher(log.getFile().getName()).matches();
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all editlog segments starting at or above the given txid.
|
||||
* @param firstTxId the txnid which to start looking
|
||||
|
@ -595,7 +626,12 @@ public class FileJournalManager implements JournalManager {
|
|||
assert lastTxId == HdfsServerConstants.INVALID_TXID;
|
||||
renameSelf(".empty");
|
||||
}
|
||||
|
||||
|
||||
public void moveAsideStaleInprogressFile() throws IOException {
|
||||
assert isInProgress;
|
||||
renameSelf(".stale");
|
||||
}
|
||||
|
||||
private void renameSelf(String newSuffix) throws IOException {
|
||||
File src = file;
|
||||
File dst = new File(src.getParent(), src.getName() + newSuffix);
|
||||
|
|
|
@ -208,9 +208,10 @@ public class NNStorageRetentionManager {
|
|||
/**
|
||||
* Interface responsible for disposing of old checkpoints and edit logs.
|
||||
*/
|
||||
static interface StoragePurger {
|
||||
interface StoragePurger {
|
||||
void purgeLog(EditLogFile log);
|
||||
void purgeImage(FSImageFile image);
|
||||
void markStale(EditLogFile log);
|
||||
}
|
||||
|
||||
static class DeletionStoragePurger implements StoragePurger {
|
||||
|
@ -234,6 +235,16 @@ public class NNStorageRetentionManager {
|
|||
LOG.warn("Could not delete " + file);
|
||||
}
|
||||
}
|
||||
|
||||
public void markStale(EditLogFile log){
|
||||
try {
|
||||
log.moveAsideStaleInprogressFile();
|
||||
} catch (IOException e) {
|
||||
// It is ok to just log the rename failure and go on, we will try next
|
||||
// time just as with deletions.
|
||||
LOG.warn("Could not mark " + log + " as stale", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,12 +24,17 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEdit
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.ToLongFunction;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||
|
@ -41,8 +46,6 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -214,6 +217,8 @@ public class TestNNStorageRetentionManager {
|
|||
|
||||
// Segments containing txns upto txId 250 are extra and should be purged.
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
|
||||
tc.addLog("/foo2/current/" + getInProgressEditsFileName(101) + ".trash",
|
||||
true);
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
|
||||
tc.addLog("/foo2/current/" + getInProgressEditsFileName(176) + ".empty",
|
||||
true);
|
||||
|
@ -225,6 +230,8 @@ public class TestNNStorageRetentionManager {
|
|||
// Only retain 2 extra segments. The 301-350 and 351-400 segments are
|
||||
// considered required, not extra.
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
|
||||
tc.addLog("/foo2/current/" + getInProgressEditsFileName(276) + ".trash",
|
||||
false);
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
|
||||
tc.addLog("/foo2/current/" + getInProgressEditsFileName(301) + ".empty",
|
||||
false);
|
||||
|
@ -235,14 +242,53 @@ public class TestNNStorageRetentionManager {
|
|||
tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
|
||||
runTest(tc);
|
||||
}
|
||||
|
||||
/* We are checking here the JournalNode environment hence added the paxos
|
||||
* directory, but as the test here is about the FileJournalManager it happens
|
||||
* via the NNStorageRetentionManager and that needs the fsImage files as well
|
||||
* to be present in the folder to calculate the minimum transaction id we want
|
||||
* to keep based on the config.
|
||||
*/
|
||||
@Test
|
||||
public void testExtraInprogressFilesAreRemovedOrMarkedStale()
|
||||
throws IOException {
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 150);
|
||||
TestCaseDescription tc = new TestCaseDescription();
|
||||
tc.addRoot("/foo", NameNodeDirType.IMAGE_AND_EDITS);
|
||||
final String PATH = "/foo/current/";
|
||||
|
||||
tc.addImage(PATH + getImageFileName(200), true);
|
||||
tc.addImage(PATH + getImageFileName(300), false);
|
||||
tc.addImage(PATH + getImageFileName(400), false);
|
||||
|
||||
File file = Mockito.spy(new File(PATH + "paxos"));
|
||||
Mockito.when(file.isDirectory()).thenReturn(true);
|
||||
tc.addFile(file);
|
||||
|
||||
tc.addLog(PATH + getFinalizedEditsFileName(1,75), true);
|
||||
tc.addLog(PATH + getInProgressEditsFileName(76), true);
|
||||
tc.addLog(PATH + getFinalizedEditsFileName(76, 120), true);
|
||||
tc.addLog(PATH + getInProgressEditsFileName(121) + ".stale", true);
|
||||
tc.addLog(PATH + getFinalizedEditsFileName(121, 150), true);
|
||||
// everything down from here should be kept.
|
||||
tc.addLog(PATH + getInProgressEditsFileName(151), false, true);
|
||||
tc.addLog(PATH + getFinalizedEditsFileName(151, 320), false);
|
||||
tc.addLog(PATH + getInProgressEditsFileName(321), false, true);
|
||||
tc.addLog(PATH + getFinalizedEditsFileName(321, 430), false);
|
||||
tc.addLog(PATH + getInProgressEditsFileName(431), false);
|
||||
|
||||
runTest(tc);
|
||||
}
|
||||
|
||||
private void runTest(TestCaseDescription tc) throws IOException {
|
||||
StoragePurger mockPurger =
|
||||
Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
|
||||
ArgumentCaptor<FSImageFile> imagesPurgedCaptor =
|
||||
ArgumentCaptor.forClass(FSImageFile.class);
|
||||
ArgumentCaptor.forClass(FSImageFile.class);
|
||||
ArgumentCaptor<EditLogFile> logsPurgedCaptor =
|
||||
ArgumentCaptor.forClass(EditLogFile.class);
|
||||
ArgumentCaptor.forClass(EditLogFile.class);
|
||||
ArgumentCaptor<EditLogFile> staleLogsCaptor =
|
||||
ArgumentCaptor.forClass(EditLogFile.class);
|
||||
|
||||
// Ask the manager to purge files we don't need any more
|
||||
new NNStorageRetentionManager(conf,
|
||||
|
@ -254,31 +300,43 @@ public class TestNNStorageRetentionManager {
|
|||
.purgeImage(imagesPurgedCaptor.capture());
|
||||
Mockito.verify(mockPurger, Mockito.atLeast(0))
|
||||
.purgeLog(logsPurgedCaptor.capture());
|
||||
Mockito.verify(mockPurger, Mockito.atLeast(0))
|
||||
.markStale(staleLogsCaptor.capture());
|
||||
|
||||
Set<String> capturedPaths = Sets.newLinkedHashSet();
|
||||
// Check images
|
||||
Set<String> purgedPaths = Sets.newLinkedHashSet();
|
||||
for (FSImageFile purged : imagesPurgedCaptor.getAllValues()) {
|
||||
purgedPaths.add(fileToPath(purged.getFile()));
|
||||
}
|
||||
Assert.assertEquals(
|
||||
for (FSImageFile captured : imagesPurgedCaptor.getAllValues()) {
|
||||
capturedPaths.add(fileToPath(captured.getFile()));
|
||||
}
|
||||
Assert.assertEquals("Image file check.",
|
||||
Joiner.on(",").join(filesToPaths(tc.expectedPurgedImages)),
|
||||
Joiner.on(",").join(purgedPaths));
|
||||
Joiner.on(",").join(capturedPaths));
|
||||
|
||||
// Check images
|
||||
purgedPaths.clear();
|
||||
for (EditLogFile purged : logsPurgedCaptor.getAllValues()) {
|
||||
purgedPaths.add(fileToPath(purged.getFile()));
|
||||
}
|
||||
Assert.assertEquals(
|
||||
capturedPaths.clear();
|
||||
// Check edit logs, and also in progress edits older than minTxIdToKeep
|
||||
for (EditLogFile captured : logsPurgedCaptor.getAllValues()) {
|
||||
capturedPaths.add(fileToPath(captured.getFile()));
|
||||
}
|
||||
Assert.assertEquals("Check old edits are removed.",
|
||||
Joiner.on(",").join(filesToPaths(tc.expectedPurgedLogs)),
|
||||
Joiner.on(",").join(purgedPaths));
|
||||
Joiner.on(",").join(capturedPaths));
|
||||
|
||||
capturedPaths.clear();
|
||||
// Check in progress edits to keep are marked as stale
|
||||
for (EditLogFile captured : staleLogsCaptor.getAllValues()) {
|
||||
capturedPaths.add(fileToPath(captured.getFile()));
|
||||
}
|
||||
Assert.assertEquals("Check unnecessary but kept edits are marked stale",
|
||||
Joiner.on(",").join(filesToPaths(tc.expectedStaleLogs)),
|
||||
Joiner.on(",").join(capturedPaths));
|
||||
}
|
||||
|
||||
|
||||
private class TestCaseDescription {
|
||||
private final Map<File, FakeRoot> dirRoots = Maps.newLinkedHashMap();
|
||||
private final Set<File> expectedPurgedLogs = Sets.newLinkedHashSet();
|
||||
private final Set<File> expectedPurgedImages = Sets.newLinkedHashSet();
|
||||
|
||||
private final Set<File> expectedStaleLogs = Sets.newLinkedHashSet();
|
||||
|
||||
private class FakeRoot {
|
||||
final NameNodeDirType type;
|
||||
final List<File> files;
|
||||
|
@ -306,13 +364,20 @@ public class TestNNStorageRetentionManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void addLog(String path, boolean expectPurge) {
|
||||
addLog(path, expectPurge, false);
|
||||
}
|
||||
|
||||
void addLog(String path, boolean expectPurge, boolean expectStale) {
|
||||
File file = new File(path);
|
||||
addFile(file);
|
||||
if (expectPurge) {
|
||||
expectedPurgedLogs.add(file);
|
||||
}
|
||||
if (expectStale) {
|
||||
expectedStaleLogs.add(file);
|
||||
}
|
||||
}
|
||||
|
||||
void addImage(String path, boolean expectPurge) {
|
||||
|
@ -330,7 +395,22 @@ public class TestNNStorageRetentionManager {
|
|||
}
|
||||
return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
|
||||
}
|
||||
|
||||
|
||||
private File findLastInProgressEdit(FakeRoot root){
|
||||
Pattern p = Pattern.compile(
|
||||
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
|
||||
ToLongFunction<File> fileNameToTxId =
|
||||
f -> {
|
||||
Matcher m = p.matcher(f.getName());
|
||||
return m.matches() ?
|
||||
Long.parseLong(m.group(1)):
|
||||
HdfsServerConstants.INVALID_TXID;
|
||||
};
|
||||
return root.files.stream().
|
||||
sorted(Comparator.comparingLong(fileNameToTxId).reversed()).
|
||||
findFirst().orElse(null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
|
||||
final List<JournalManager> jms = Lists.newArrayList();
|
||||
|
@ -341,36 +421,28 @@ public class TestNNStorageRetentionManager {
|
|||
// passing null NNStorage for unit test because it does not use it
|
||||
FileJournalManager fjm = new FileJournalManager(conf,
|
||||
root.mockStorageDir(), null);
|
||||
fjm.currentInProgress = findLastInProgressEdit(root);
|
||||
fjm.purger = purger;
|
||||
jms.add(fjm);
|
||||
journalSet.add(fjm, false);
|
||||
}
|
||||
|
||||
FSEditLog mockLog = Mockito.mock(FSEditLog.class);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
assert args.length == 1;
|
||||
long txId = (Long) args[0];
|
||||
|
||||
for (JournalManager jm : jms) {
|
||||
jm.purgeLogsOlderThan(txId);
|
||||
}
|
||||
return null;
|
||||
Mockito.doAnswer(invocation -> {
|
||||
Object[] args = invocation.getArguments();
|
||||
assert args.length == 1;
|
||||
long txId = (Long) args[0];
|
||||
for (JournalManager jm : jms) {
|
||||
jm.purgeLogsOlderThan(txId);
|
||||
}
|
||||
return null;
|
||||
}).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
|
||||
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
||||
(Long)args[1], (Boolean)args[2], (Boolean)args[3]);
|
||||
return null;
|
||||
}
|
||||
Mockito.doAnswer(invocation -> {
|
||||
Object[] args = invocation.getArguments();
|
||||
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
||||
(Long)args[1], (Boolean)args[2], (Boolean)args[3]);
|
||||
return null;
|
||||
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
||||
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
|
||||
return mockLog;
|
||||
|
@ -401,21 +473,17 @@ public class TestNNStorageRetentionManager {
|
|||
return paths;
|
||||
}
|
||||
|
||||
private static NNStorage mockStorageForDirs(final StorageDirectory ... mockDirs)
|
||||
private static NNStorage mockStorageForDirs(final StorageDirectory... mockDirs)
|
||||
throws IOException {
|
||||
NNStorage mockStorage = Mockito.mock(NNStorage.class);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
FSImageStorageInspector inspector =
|
||||
Mockito.doAnswer(invocation -> {
|
||||
FSImageStorageInspector inspector =
|
||||
(FSImageStorageInspector) invocation.getArguments()[0];
|
||||
for (StorageDirectory sd : mockDirs) {
|
||||
inspector.inspectDirectory(sd);
|
||||
}
|
||||
return null;
|
||||
for (StorageDirectory sd : mockDirs) {
|
||||
inspector.inspectDirectory(sd);
|
||||
}
|
||||
}).when(mockStorage).inspectStorageDirs(
|
||||
Mockito.<FSImageStorageInspector>anyObject());
|
||||
return null;
|
||||
}).when(mockStorage).inspectStorageDirs(Mockito.any());
|
||||
return mockStorage;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue