HDFS-2946. HA: Put a cap on the number of completed edits files retained by the NN. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1398609 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f5f763eb15
commit
557ffe2101
|
@ -386,6 +386,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HDFS-4036. Remove "throw UnresolvedLinkException" from
|
HDFS-4036. Remove "throw UnresolvedLinkException" from
|
||||||
FSDirectory.unprotectedAddFile(..). (Jing Zhao via szetszwo)
|
FSDirectory.unprotectedAddFile(..). (Jing Zhao via szetszwo)
|
||||||
|
|
||||||
|
HDFS-2946. HA: Put a cap on the number of completed edits files retained
|
||||||
|
by the NN. (atm)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -162,6 +162,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
|
public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
|
||||||
public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
|
public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
|
||||||
public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
|
public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
|
||||||
|
public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained";
|
||||||
|
public static final int DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
|
||||||
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
|
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
|
||||||
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
|
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
|
||||||
|
|
||||||
|
|
|
@ -1175,6 +1175,11 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
|
long fromTxId, boolean inProgressOk) {
|
||||||
|
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
|
||||||
|
}
|
||||||
|
|
||||||
public Collection<EditLogInputStream> selectInputStreams(
|
public Collection<EditLogInputStream> selectInputStreams(
|
||||||
long fromTxId, long toAtLeastTxId) throws IOException {
|
long fromTxId, long toAtLeastTxId) throws IOException {
|
||||||
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
||||||
|
@ -1191,7 +1196,7 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||||
boolean inProgressOk) throws IOException {
|
boolean inProgressOk) throws IOException {
|
||||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
|
selectInputStreams(streams, fromTxId, inProgressOk);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
||||||
|
|
|
@ -247,6 +247,11 @@ public class FileJournalManager implements JournalManager {
|
||||||
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
||||||
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
|
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
|
||||||
"from among " + elfs.size() + " candidate file(s)");
|
"from among " + elfs.size() + " candidate file(s)");
|
||||||
|
addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
|
||||||
|
Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
|
||||||
for (EditLogFile elf : elfs) {
|
for (EditLogFile elf : elfs) {
|
||||||
if (elf.isInProgress()) {
|
if (elf.isInProgress()) {
|
||||||
if (!inProgressOk) {
|
if (!inProgressOk) {
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -56,21 +55,6 @@ public interface JournalManager extends Closeable, FormatConfirmable,
|
||||||
*/
|
*/
|
||||||
void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
|
void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a list of edit log input streams. The list will start with the
|
|
||||||
* stream that contains fromTxnId, and continue until the end of the journal
|
|
||||||
* being managed.
|
|
||||||
*
|
|
||||||
* @param fromTxnId the first transaction id we want to read
|
|
||||||
* @param inProgressOk whether or not in-progress streams should be returned
|
|
||||||
*
|
|
||||||
* @return a list of streams
|
|
||||||
* @throws IOException if the underlying storage has an error or is otherwise
|
|
||||||
* inaccessible
|
|
||||||
*/
|
|
||||||
void selectInputStreams(Collection<EditLogInputStream> streams,
|
|
||||||
long fromTxnId, boolean inProgressOk) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the amount of memory that this stream should use to buffer edits
|
* Set the amount of memory that this stream should use to buffer edits
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface used to abstract over classes which manage edit logs that may need
|
* Interface used to abstract over classes which manage edit logs that may need
|
||||||
|
@ -34,4 +35,19 @@ interface LogsPurgeable {
|
||||||
*/
|
*/
|
||||||
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
|
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of edit log input streams. The list will start with the
|
||||||
|
* stream that contains fromTxnId, and continue until the end of the journal
|
||||||
|
* being managed.
|
||||||
|
*
|
||||||
|
* @param fromTxId the first transaction id we want to read
|
||||||
|
* @param inProgressOk whether or not in-progress streams should be returned
|
||||||
|
*
|
||||||
|
* @return a list of streams
|
||||||
|
* @throws IOException if the underlying storage has an error or is otherwise
|
||||||
|
* inaccessible
|
||||||
|
*/
|
||||||
|
void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
|
long fromTxId, boolean inProgressOk) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.ComparisonChain;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
@ -48,6 +51,7 @@ public class NNStorageRetentionManager {
|
||||||
|
|
||||||
private final int numCheckpointsToRetain;
|
private final int numCheckpointsToRetain;
|
||||||
private final long numExtraEditsToRetain;
|
private final long numExtraEditsToRetain;
|
||||||
|
private final int maxExtraEditsSegmentsToRetain;
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
NNStorageRetentionManager.class);
|
NNStorageRetentionManager.class);
|
||||||
private final NNStorage storage;
|
private final NNStorage storage;
|
||||||
|
@ -65,6 +69,9 @@ public class NNStorageRetentionManager {
|
||||||
this.numExtraEditsToRetain = conf.getLong(
|
this.numExtraEditsToRetain = conf.getLong(
|
||||||
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
|
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
|
||||||
|
this.maxExtraEditsSegmentsToRetain = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
|
||||||
Preconditions.checkArgument(numCheckpointsToRetain > 0,
|
Preconditions.checkArgument(numCheckpointsToRetain > 0,
|
||||||
"Must retain at least one checkpoint");
|
"Must retain at least one checkpoint");
|
||||||
Preconditions.checkArgument(numExtraEditsToRetain >= 0,
|
Preconditions.checkArgument(numExtraEditsToRetain >= 0,
|
||||||
|
@ -94,7 +101,39 @@ public class NNStorageRetentionManager {
|
||||||
// provide a "cushion" of older txns that we keep, which is
|
// provide a "cushion" of older txns that we keep, which is
|
||||||
// handy for HA, where a remote node may not have as many
|
// handy for HA, where a remote node may not have as many
|
||||||
// new images.
|
// new images.
|
||||||
long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain);
|
//
|
||||||
|
// First, determine the target number of extra transactions to retain based
|
||||||
|
// on the configured amount.
|
||||||
|
long minimumRequiredTxId = minImageTxId + 1;
|
||||||
|
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
|
||||||
|
|
||||||
|
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
|
||||||
|
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
|
||||||
|
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
|
||||||
|
@Override
|
||||||
|
public int compare(EditLogInputStream a, EditLogInputStream b) {
|
||||||
|
return ComparisonChain.start()
|
||||||
|
.compare(a.getFirstTxId(), b.getFirstTxId())
|
||||||
|
.compare(a.getLastTxId(), b.getLastTxId())
|
||||||
|
.result();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Next, adjust the number of transactions to retain if doing so would mean
|
||||||
|
// keeping too many segments around.
|
||||||
|
while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
|
||||||
|
purgeLogsFrom = editLogs.get(0).getFirstTxId();
|
||||||
|
editLogs.remove(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, ensure that we're not trying to purge any transactions that we
|
||||||
|
// actually need.
|
||||||
|
if (purgeLogsFrom > minimumRequiredTxId) {
|
||||||
|
throw new AssertionError("Should not purge more edits than required to "
|
||||||
|
+ "restore: " + purgeLogsFrom + " should be <= "
|
||||||
|
+ minimumRequiredTxId);
|
||||||
|
}
|
||||||
|
|
||||||
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
|
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -752,6 +752,24 @@ public class SecondaryNameNode implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
|
long fromTxId, boolean inProgressOk) {
|
||||||
|
Iterator<StorageDirectory> iter = storage.dirIterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
StorageDirectory dir = iter.next();
|
||||||
|
List<EditLogFile> editFiles;
|
||||||
|
try {
|
||||||
|
editFiles = FileJournalManager.matchEditLogs(
|
||||||
|
dir.getCurrentDir());
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new RuntimeException(ioe);
|
||||||
|
}
|
||||||
|
FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
|
||||||
|
fromTxId, inProgressOk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -660,6 +660,20 @@
|
||||||
edits in order to start again.
|
edits in order to start again.
|
||||||
Typically each edit is on the order of a few hundred bytes, so the default
|
Typically each edit is on the order of a few hundred bytes, so the default
|
||||||
of 1 million edits should be on the order of hundreds of MBs or low GBs.
|
of 1 million edits should be on the order of hundreds of MBs or low GBs.
|
||||||
|
|
||||||
|
NOTE: Fewer extra edits may be retained than value specified for this setting
|
||||||
|
if doing so would mean that more segments would be retained than the number
|
||||||
|
configured by dfs.namenode.max.extra.edits.segments.retained.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.max.extra.edits.segments.retained</name>
|
||||||
|
<value>10000</value>
|
||||||
|
<description>The maximum number of extra edit log segments which should be retained
|
||||||
|
beyond what is minimally necessary for a NN restart. When used in conjunction with
|
||||||
|
dfs.namenode.num.extra.edits.retained, this configuration property serves to cap
|
||||||
|
the number of extra edits files to a reasonable value.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
|
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -196,6 +197,35 @@ public class TestNNStorageRetentionManager {
|
||||||
runTest(tc);
|
runTest(tc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetainExtraLogsLimitedSegments() throws IOException {
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
|
||||||
|
150);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, 2);
|
||||||
|
TestCaseDescription tc = new TestCaseDescription();
|
||||||
|
tc.addRoot("/foo1", NameNodeDirType.IMAGE);
|
||||||
|
tc.addRoot("/foo2", NameNodeDirType.EDITS);
|
||||||
|
tc.addImage("/foo1/current/" + getImageFileName(100), true);
|
||||||
|
tc.addImage("/foo1/current/" + getImageFileName(200), true);
|
||||||
|
tc.addImage("/foo1/current/" + getImageFileName(300), false);
|
||||||
|
tc.addImage("/foo1/current/" + getImageFileName(400), false);
|
||||||
|
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
|
||||||
|
// Without lowering the max segments to retain, we'd retain all segments
|
||||||
|
// going back to txid 150 (300 - 150).
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(176, 200), true);
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(201, 225), true);
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(226, 240), true);
|
||||||
|
// Only retain 2 extra segments. The 301-400 segment is considered required,
|
||||||
|
// not extra.
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
|
||||||
|
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(301, 400), false);
|
||||||
|
tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
|
||||||
|
runTest(tc);
|
||||||
|
}
|
||||||
|
|
||||||
private void runTest(TestCaseDescription tc) throws IOException {
|
private void runTest(TestCaseDescription tc) throws IOException {
|
||||||
StoragePurger mockPurger =
|
StoragePurger mockPurger =
|
||||||
Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
|
Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
|
||||||
|
@ -287,8 +317,10 @@ public class TestNNStorageRetentionManager {
|
||||||
return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
|
return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public FSEditLog mockEditLog(StoragePurger purger) {
|
public FSEditLog mockEditLog(StoragePurger purger) {
|
||||||
final List<JournalManager> jms = Lists.newArrayList();
|
final List<JournalManager> jms = Lists.newArrayList();
|
||||||
|
final JournalSet journalSet = new JournalSet(0);
|
||||||
for (FakeRoot root : dirRoots.values()) {
|
for (FakeRoot root : dirRoots.values()) {
|
||||||
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
|
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
|
||||||
|
|
||||||
|
@ -297,6 +329,7 @@ public class TestNNStorageRetentionManager {
|
||||||
root.mockStorageDir(), null);
|
root.mockStorageDir(), null);
|
||||||
fjm.purger = purger;
|
fjm.purger = purger;
|
||||||
jms.add(fjm);
|
jms.add(fjm);
|
||||||
|
journalSet.add(fjm, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
FSEditLog mockLog = Mockito.mock(FSEditLog.class);
|
FSEditLog mockLog = Mockito.mock(FSEditLog.class);
|
||||||
|
@ -314,6 +347,18 @@ public class TestNNStorageRetentionManager {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
|
}).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
|
||||||
|
|
||||||
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
Object[] args = invocation.getArguments();
|
||||||
|
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
||||||
|
(long)((Long)args[1]), (boolean)((Boolean)args[2]));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
||||||
|
Mockito.anyLong(), Mockito.anyBoolean());
|
||||||
return mockLog;
|
return mockLog;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue