HDFS-3956. QJM: purge temporary files when no longer within retention period. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1387817 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f26c63df21
commit
8a3e64c77f
|
@ -80,3 +80,5 @@ HDFS-3943. QJM: remove currently-unused md5sum field (todd)
|
||||||
HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd)
|
HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd)
|
||||||
|
|
||||||
HDFS-3955. QJM: Make acceptRecovery() atomic. (todd)
|
HDFS-3955. QJM: Make acceptRecovery() atomic. (todd)
|
||||||
|
|
||||||
|
HDFS-3956. QJM: purge temporary files when no longer within retention period (todd)
|
||||||
|
|
|
@ -19,7 +19,11 @@ package org.apache.hadoop.hdfs.qjournal.server;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
|
@ -28,6 +32,8 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link Storage} implementation for the {@link JournalNode}.
|
* A {@link Storage} implementation for the {@link JournalNode}.
|
||||||
*
|
*
|
||||||
|
@ -40,6 +46,15 @@ class JNStorage extends Storage {
|
||||||
private final StorageDirectory sd;
|
private final StorageDirectory sd;
|
||||||
private StorageState state;
|
private StorageState state;
|
||||||
|
|
||||||
|
|
||||||
|
private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
|
||||||
|
ImmutableList.of(
|
||||||
|
Pattern.compile("edits_\\d+-(\\d+)"),
|
||||||
|
Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?"));
|
||||||
|
|
||||||
|
private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
|
||||||
|
ImmutableList.of(Pattern.compile("(\\d+)"));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param logDir the path to the directory in which data will be stored
|
* @param logDir the path to the directory in which data will be stored
|
||||||
* @param errorReporter a callback to report errors
|
* @param errorReporter a callback to report errors
|
||||||
|
@ -112,6 +127,48 @@ class JNStorage extends Storage {
|
||||||
return new File(sd.getCurrentDir(), "paxos");
|
return new File(sd.getCurrentDir(), "paxos");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove any log files and associated paxos files which are older than
|
||||||
|
* the given txid.
|
||||||
|
*/
|
||||||
|
void purgeDataOlderThan(long minTxIdToKeep) throws IOException {
|
||||||
|
purgeMatching(sd.getCurrentDir(),
|
||||||
|
CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep);
|
||||||
|
purgeMatching(getPaxosDir(), PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Purge files in the given directory which match any of the set of patterns.
|
||||||
|
* The patterns must have a single numeric capture group which determines
|
||||||
|
* the associated transaction ID of the file. Only those files for which
|
||||||
|
* the transaction ID is less than the <code>minTxIdToKeep</code> parameter
|
||||||
|
* are removed.
|
||||||
|
*/
|
||||||
|
private static void purgeMatching(File dir, List<Pattern> patterns,
|
||||||
|
long minTxIdToKeep) throws IOException {
|
||||||
|
|
||||||
|
for (File f : FileUtil.listFiles(dir)) {
|
||||||
|
if (!f.isFile()) continue;
|
||||||
|
|
||||||
|
for (Pattern p : patterns) {
|
||||||
|
Matcher matcher = p.matcher(f.getName());
|
||||||
|
if (matcher.matches()) {
|
||||||
|
// This parsing will always succeed since the group(1) is
|
||||||
|
// /\d+/ in the regex itself.
|
||||||
|
long txid = Long.valueOf(matcher.group(1));
|
||||||
|
if (txid < minTxIdToKeep) {
|
||||||
|
LOG.info("Purging no-longer needed file " + txid);
|
||||||
|
if (!f.delete()) {
|
||||||
|
LOG.warn("Unable to delete no-longer-needed data " +
|
||||||
|
f);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void format(NamespaceInfo nsInfo) throws IOException {
|
void format(NamespaceInfo nsInfo) throws IOException {
|
||||||
setStorageInfo(nsInfo);
|
setStorageInfo(nsInfo);
|
||||||
LOG.info("Formatting journal storage directory " +
|
LOG.info("Formatting journal storage directory " +
|
||||||
|
|
|
@ -27,6 +27,8 @@ import java.net.URL;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -595,8 +597,7 @@ class Journal implements Closeable {
|
||||||
checkFormatted();
|
checkFormatted();
|
||||||
checkRequest(reqInfo);
|
checkRequest(reqInfo);
|
||||||
|
|
||||||
fjm.purgeLogsOlderThan(minTxIdToKeep);
|
storage.purgeDataOlderThan(minTxIdToKeep);
|
||||||
purgePaxosDecisionsOlderThan(minTxIdToKeep);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -614,30 +615,6 @@ class Journal implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void purgePaxosDecisionsOlderThan(long minTxIdToKeep)
|
|
||||||
throws IOException {
|
|
||||||
File dir = storage.getPaxosDir();
|
|
||||||
for (File f : FileUtil.listFiles(dir)) {
|
|
||||||
if (!f.isFile()) continue;
|
|
||||||
|
|
||||||
long txid;
|
|
||||||
try {
|
|
||||||
txid = Long.valueOf(f.getName());
|
|
||||||
} catch (NumberFormatException nfe) {
|
|
||||||
LOG.warn("Unexpected non-numeric file name for " + f.getAbsolutePath());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (txid < minTxIdToKeep) {
|
|
||||||
if (!f.delete()) {
|
|
||||||
LOG.warn("Unable to delete no-longer-needed paxos decision record " +
|
|
||||||
f);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see QJournalProtocol#getEditLogManifest(String, long)
|
* @see QJournalProtocol#getEditLogManifest(String, long)
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -234,10 +234,6 @@ public class TestQJMWithFaults {
|
||||||
|
|
||||||
QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r);
|
QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r);
|
||||||
try {
|
try {
|
||||||
if (txid > 100) {
|
|
||||||
qjm.purgeLogsOlderThan(txid - 100);
|
|
||||||
}
|
|
||||||
|
|
||||||
long recovered;
|
long recovered;
|
||||||
try {
|
try {
|
||||||
recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
|
recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
|
||||||
|
@ -252,6 +248,12 @@ public class TestQJMWithFaults {
|
||||||
|
|
||||||
txid = recovered + 1;
|
txid = recovered + 1;
|
||||||
|
|
||||||
|
// Periodically purge old data on disk so it's easier to look
|
||||||
|
// at failure cases.
|
||||||
|
if (txid > 100 && i % 10 == 1) {
|
||||||
|
qjm.purgeLogsOlderThan(txid - 100);
|
||||||
|
}
|
||||||
|
|
||||||
Holder<Throwable> thrown = new Holder<Throwable>(null);
|
Holder<Throwable> thrown = new Holder<Throwable>(null);
|
||||||
for (int j = 0; j < SEGMENTS_PER_WRITER; j++) {
|
for (int j = 0; j < SEGMENTS_PER_WRITER; j++) {
|
||||||
lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown);
|
lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown);
|
||||||
|
|
|
@ -854,6 +854,12 @@ public class TestQuorumJournalManager {
|
||||||
GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
|
GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
|
||||||
"1", "3");
|
"1", "3");
|
||||||
|
|
||||||
|
// Create some temporary files of the sort that are used during recovery.
|
||||||
|
assertTrue(new File(curDir,
|
||||||
|
"edits_inprogress_0000000000000000001.epoch=140").createNewFile());
|
||||||
|
assertTrue(new File(curDir,
|
||||||
|
"edits_inprogress_0000000000000000002.empty").createNewFile());
|
||||||
|
|
||||||
qjm.purgeLogsOlderThan(3);
|
qjm.purgeLogsOlderThan(3);
|
||||||
|
|
||||||
// Log purging is asynchronous, so we have to wait for the calls
|
// Log purging is asynchronous, so we have to wait for the calls
|
||||||
|
|
Loading…
Reference in New Issue