HDFS-2946. HA: Put a cap on the number of completed edits files retained by the NN. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1398608 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
32728e64bb
commit
b6d441277c
|
@ -57,6 +57,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
HDFS-4036. Remove "throw UnresolvedLinkException" from
|
||||
FSDirectory.unprotectedAddFile(..). (Jing Zhao via szetszwo)
|
||||
|
||||
HDFS-2946. HA: Put a cap on the number of completed edits files retained
|
||||
by the NN. (atm)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -162,6 +162,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
|
||||
public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
|
||||
public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
|
||||
public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained";
|
||||
public static final int DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
|
||||
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
|
||||
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "2.0.0-SNAPSHOT";
|
||||
|
||||
|
|
|
@ -1071,6 +1071,11 @@ public class FSEditLog implements LogsPurgeable {
|
|||
// All journals have failed, it is handled in logSync.
|
||||
}
|
||||
}
|
||||
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk) throws IOException {
|
||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
|
||||
}
|
||||
|
||||
public Collection<EditLogInputStream> selectInputStreams(
|
||||
long fromTxId, long toAtLeastTxId) throws IOException {
|
||||
|
@ -1088,7 +1093,7 @@ public class FSEditLog implements LogsPurgeable {
|
|||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||
boolean inProgressOk) throws IOException {
|
||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
|
||||
selectInputStreams(streams, fromTxId, inProgressOk);
|
||||
|
||||
try {
|
||||
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
||||
|
|
|
@ -232,6 +232,11 @@ class FileJournalManager implements JournalManager {
|
|||
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
||||
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
|
||||
"from among " + elfs.size() + " candidate file(s)");
|
||||
addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
|
||||
}
|
||||
|
||||
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
|
||||
Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
|
||||
for (EditLogFile elf : elfs) {
|
||||
if (elf.lastTxId < fromTxId) {
|
||||
LOG.debug("passing over " + elf + " because it ends at " +
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -46,19 +45,6 @@ public interface JournalManager extends Closeable, LogsPurgeable {
|
|||
*/
|
||||
void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
|
||||
|
||||
/**
|
||||
* Get a list of edit log input streams. The list will start with the
|
||||
* stream that contains fromTxnId, and continue until the end of the journal
|
||||
* being managed.
|
||||
*
|
||||
* @param fromTxnId the first transaction id we want to read
|
||||
* @param inProgressOk whether or not in-progress streams should be returned
|
||||
*
|
||||
* @return a list of streams
|
||||
*/
|
||||
void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxnId, boolean inProgressOk);
|
||||
|
||||
/**
|
||||
* Set the amount of memory that this stream should use to buffer edits
|
||||
*/
|
||||
|
|
|
@ -216,7 +216,7 @@ public class JournalSet implements JournalManager {
|
|||
*/
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk) {
|
||||
long fromTxId, boolean inProgressOk) throws IOException {
|
||||
final TreeMultiset<EditLogInputStream> allStreams =
|
||||
TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||
for (JournalAndStream jas : journals) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Interface used to abstract over classes which manage edit logs that may need
|
||||
|
@ -33,5 +34,20 @@ interface LogsPurgeable {
|
|||
* @throws IOException in the event of error
|
||||
*/
|
||||
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Get a list of edit log input streams. The list will start with the
|
||||
* stream that contains fromTxnId, and continue until the end of the journal
|
||||
* being managed.
|
||||
*
|
||||
* @param fromTxId the first transaction id we want to read
|
||||
* @param inProgressOk whether or not in-progress streams should be returned
|
||||
*
|
||||
* @return a list of streams
|
||||
* @throws IOException if the underlying storage has an error or is otherwise
|
||||
* inaccessible
|
||||
*/
|
||||
void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
|||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ComparisonChain;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
|
@ -48,6 +51,7 @@ public class NNStorageRetentionManager {
|
|||
|
||||
private final int numCheckpointsToRetain;
|
||||
private final long numExtraEditsToRetain;
|
||||
private final int maxExtraEditsSegmentsToRetain;
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
NNStorageRetentionManager.class);
|
||||
private final NNStorage storage;
|
||||
|
@ -65,6 +69,9 @@ public class NNStorageRetentionManager {
|
|||
this.numExtraEditsToRetain = conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
|
||||
this.maxExtraEditsSegmentsToRetain = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
|
||||
Preconditions.checkArgument(numCheckpointsToRetain > 0,
|
||||
"Must retain at least one checkpoint");
|
||||
Preconditions.checkArgument(numExtraEditsToRetain >= 0,
|
||||
|
@ -94,7 +101,39 @@ public class NNStorageRetentionManager {
|
|||
// provide a "cushion" of older txns that we keep, which is
|
||||
// handy for HA, where a remote node may not have as many
|
||||
// new images.
|
||||
long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain);
|
||||
//
|
||||
// First, determine the target number of extra transactions to retain based
|
||||
// on the configured amount.
|
||||
long minimumRequiredTxId = minImageTxId + 1;
|
||||
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
|
||||
|
||||
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
|
||||
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
|
||||
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
|
||||
@Override
|
||||
public int compare(EditLogInputStream a, EditLogInputStream b) {
|
||||
return ComparisonChain.start()
|
||||
.compare(a.getFirstTxId(), b.getFirstTxId())
|
||||
.compare(a.getLastTxId(), b.getLastTxId())
|
||||
.result();
|
||||
}
|
||||
});
|
||||
|
||||
// Next, adjust the number of transactions to retain if doing so would mean
|
||||
// keeping too many segments around.
|
||||
while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
|
||||
purgeLogsFrom = editLogs.get(0).getFirstTxId();
|
||||
editLogs.remove(0);
|
||||
}
|
||||
|
||||
// Finally, ensure that we're not trying to purge any transactions that we
|
||||
// actually need.
|
||||
if (purgeLogsFrom > minimumRequiredTxId) {
|
||||
throw new AssertionError("Should not purge more edits than required to "
|
||||
+ "restore: " + purgeLogsFrom + " should be <= "
|
||||
+ minimumRequiredTxId);
|
||||
}
|
||||
|
||||
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
|
||||
}
|
||||
|
||||
|
|
|
@ -751,6 +751,24 @@ public class SecondaryNameNode implements Runnable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||
long fromTxId, boolean inProgressOk) {
|
||||
Iterator<StorageDirectory> iter = storage.dirIterator();
|
||||
while (iter.hasNext()) {
|
||||
StorageDirectory dir = iter.next();
|
||||
List<EditLogFile> editFiles;
|
||||
try {
|
||||
editFiles = FileJournalManager.matchEditLogs(
|
||||
dir.getCurrentDir());
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
|
||||
fromTxId, inProgressOk);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -655,6 +655,20 @@
|
|||
edits in order to start again.
|
||||
Typically each edit is on the order of a few hundred bytes, so the default
|
||||
of 1 million edits should be on the order of hundreds of MBs or low GBs.
|
||||
|
||||
NOTE: Fewer extra edits may be retained than value specified for this setting
|
||||
if doing so would mean that more segments would be retained than the number
|
||||
configured by dfs.namenode.max.extra.edits.segments.retained.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.max.extra.edits.segments.retained</name>
|
||||
<value>10000</value>
|
||||
<description>The maximum number of extra edit log segments which should be retained
|
||||
beyond what is minimally necessary for a NN restart. When used in conjunction with
|
||||
dfs.namenode.num.extra.edits.retained, this configuration property serves to cap
|
||||
the number of extra edits files to a reasonable value.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
|
|||
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -196,6 +197,35 @@ public class TestNNStorageRetentionManager {
|
|||
runTest(tc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainExtraLogsLimitedSegments() throws IOException {
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
|
||||
150);
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, 2);
|
||||
TestCaseDescription tc = new TestCaseDescription();
|
||||
tc.addRoot("/foo1", NameNodeDirType.IMAGE);
|
||||
tc.addRoot("/foo2", NameNodeDirType.EDITS);
|
||||
tc.addImage("/foo1/current/" + getImageFileName(100), true);
|
||||
tc.addImage("/foo1/current/" + getImageFileName(200), true);
|
||||
tc.addImage("/foo1/current/" + getImageFileName(300), false);
|
||||
tc.addImage("/foo1/current/" + getImageFileName(400), false);
|
||||
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
|
||||
// Without lowering the max segments to retain, we'd retain all segments
|
||||
// going back to txid 150 (300 - 150).
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(176, 200), true);
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(201, 225), true);
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(226, 240), true);
|
||||
// Only retain 2 extra segments. The 301-400 segment is considered required,
|
||||
// not extra.
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
|
||||
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(301, 400), false);
|
||||
tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
|
||||
runTest(tc);
|
||||
}
|
||||
|
||||
private void runTest(TestCaseDescription tc) throws IOException {
|
||||
StoragePurger mockPurger =
|
||||
Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
|
||||
|
@ -287,8 +317,10 @@ public class TestNNStorageRetentionManager {
|
|||
return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
|
||||
}
|
||||
|
||||
public FSEditLog mockEditLog(StoragePurger purger) {
|
||||
@SuppressWarnings("unchecked")
|
||||
public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
|
||||
final List<JournalManager> jms = Lists.newArrayList();
|
||||
final JournalSet journalSet = new JournalSet(0);
|
||||
for (FakeRoot root : dirRoots.values()) {
|
||||
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
|
||||
|
||||
|
@ -297,6 +329,7 @@ public class TestNNStorageRetentionManager {
|
|||
root.mockStorageDir(), null);
|
||||
fjm.purger = purger;
|
||||
jms.add(fjm);
|
||||
journalSet.add(fjm, false);
|
||||
}
|
||||
|
||||
FSEditLog mockLog = Mockito.mock(FSEditLog.class);
|
||||
|
@ -314,6 +347,18 @@ public class TestNNStorageRetentionManager {
|
|||
return null;
|
||||
}
|
||||
}).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
|
||||
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
||||
(long)((Long)args[1]), (boolean)((Boolean)args[2]));
|
||||
return null;
|
||||
}
|
||||
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
||||
Mockito.anyLong(), Mockito.anyBoolean());
|
||||
return mockLog;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue