HDFS-4238. Standby namenode should not do purging of shared storage edits. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1417652 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-12-05 21:18:16 +00:00
parent df9d2b0394
commit 46f2591a9e
4 changed files with 41 additions and 5 deletions

View File

@ -292,6 +292,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4231. BackupNode: Introduce BackupState. (shv) HDFS-4231. BackupNode: Introduce BackupState. (shv)
HDFS-4238. Standby namenode should not do purging of shared
storage edits. (todd)
BREAKDOWN OF HDFS-3077 SUBTASKS BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs. HDFS-3077. Quorum-based protocol for reading and writing edit logs.

View File

@ -858,6 +858,11 @@ public class FSEditLog implements LogsPurgeable {
return journalSet; return journalSet;
} }
@VisibleForTesting
synchronized void setJournalSetForTesting(JournalSet js) {
this.journalSet = js;
}
/** /**
* Used only by tests. * Used only by tests.
*/ */
@ -978,9 +983,18 @@ public class FSEditLog implements LogsPurgeable {
/** /**
* Archive any log files that are older than the given txid. * Archive any log files that are older than the given txid.
*
* If the edit log is not open for write, then this call returns with no
* effect.
*/ */
@Override @Override
public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) { public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
// Should not purge logs unless they are open for write.
// This prevents the SBN from purging logs on shared storage, for example.
if (!isOpenForWrite()) {
return;
}
assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
minTxIdToKeep <= curSegmentTxId : minTxIdToKeep <= curSegmentTxId :
"cannot purge logs older than txid " + minTxIdToKeep + "cannot purge logs older than txid " + minTxIdToKeep +

View File

@ -179,6 +179,13 @@ public class NameNodeAdapter {
return spy; return spy;
} }
public static JournalSet spyOnJournalSet(NameNode nn) {
FSEditLog editLog = nn.getFSImage().getEditLog();
JournalSet js = Mockito.spy(editLog.getJournalSet());
editLog.setJournalSetForTesting(js);
return js;
}
public static String getMkdirOpPath(FSEditLogOp op) { public static String getMkdirOpPath(FSEditLogOp op) {
if (op.opCode == FSEditLogOpCodes.OP_MKDIR) { if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
return ((MkdirOp) op).path; return ((MkdirOp) op).path;

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -66,6 +67,12 @@ public class TestStandbyCheckpoints {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
// Dial down the retention of extra edits and checkpoints. This is to
// help catch regressions of HDFS-4238 (SBN should not purge shared edits)
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName()); SlowCodec.class.getCanonicalName());
@ -99,15 +106,20 @@ public class TestStandbyCheckpoints {
@Test @Test
public void testSBNCheckpoints() throws Exception { public void testSBNCheckpoints() throws Exception {
doEdits(0, 10); JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nn1);
doEdits(0, 10);
HATestUtil.waitForStandbyToCatchUp(nn0, nn1); HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
// Once the standby catches up, it should notice that it needs to // Once the standby catches up, it should notice that it needs to
// do a checkpoint and save one to its local directories. // do a checkpoint and save one to its local directories.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12)); HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
// It should also upload it back to the active. // It should also upload it back to the active.
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12)); HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
// The standby should never try to purge edit logs on shared storage.
Mockito.verify(standbyJournalSet, Mockito.never()).
purgeLogsOlderThan(Mockito.anyLong());
} }
/** /**
@ -129,8 +141,8 @@ public class TestStandbyCheckpoints {
// so the standby will catch up. Then, both will be in standby mode // so the standby will catch up. Then, both will be in standby mode
// with enough uncheckpointed txns to cause a checkpoint, and they // with enough uncheckpointed txns to cause a checkpoint, and they
// will each try to take a checkpoint and upload to each other. // will each try to take a checkpoint and upload to each other.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12)); HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12)); HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
assertEquals(12, nn0.getNamesystem().getFSImage() assertEquals(12, nn0.getNamesystem().getFSImage()
.getMostRecentCheckpointTxId()); .getMostRecentCheckpointTxId());