HDFS-2766. Test for case where standby partially reads log and then performs checkpoint. Contributed by Aaron T. Myers
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1229929 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a4f4becf52
commit
a339836bbc
|
@ -99,3 +99,5 @@ HDFS-2753. Fix standby getting stuck in safemode when blocks are written while S
|
|||
HDFS-2773. Reading edit logs from an earlier version should not leave blocks in under-construction state. (todd)
|
||||
|
||||
HDFS-2775. Fix TestStandbyCheckpoints.testBothNodesInStandbyState failing intermittently. (todd)
|
||||
|
||||
HDFS-2766. Test for case where standby partially reads log and then performs checkpoint. (atm)
|
||||
|
|
|
@ -204,7 +204,13 @@ class FileJournalManager implements JournalManager {
|
|||
}
|
||||
EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
|
||||
elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
|
||||
elfis.skipTransactions(fromTxId - elf.getFirstTxId());
|
||||
long transactionsToSkip = fromTxId - elf.getFirstTxId();
|
||||
if (transactionsToSkip > 0) {
|
||||
LOG.info(String.format("Log begins at txid %d, but requested start "
|
||||
+ "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
|
||||
transactionsToSkip));
|
||||
elfis.skipTransactions(transactionsToSkip);
|
||||
}
|
||||
return elfis;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,9 @@ import java.util.Map.Entry;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
@ -64,6 +66,8 @@ import static org.mockito.Mockito.mock;
|
|||
*/
|
||||
public abstract class FSImageTestUtil {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(FSImageTestUtil.class.getName());
|
||||
|
||||
/**
|
||||
* The position in the fsimage header where the txid is
|
||||
* written.
|
||||
|
@ -410,6 +414,8 @@ public abstract class FSImageTestUtil {
|
|||
|
||||
for (File nameDir : getNameNodeCurrentDirs(cluster, nnIdx)) {
|
||||
// Should have fsimage_N for the three checkpoints
|
||||
LOG.info("Examining storage dir " + nameDir + " with contents: "
|
||||
+ StringUtils.join(nameDir.listFiles(), ", "));
|
||||
for (long checkpointTxId : txids) {
|
||||
File image = new File(nameDir,
|
||||
NNStorage.getImageFileName(checkpointTxId));
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
|
@ -159,4 +161,21 @@ public abstract class HATestUtil {
|
|||
public static String getLogicalHostname(MiniDFSCluster cluster) {
|
||||
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
|
||||
}
|
||||
|
||||
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
|
||||
List<Integer> txids) throws InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids);
|
||||
return;
|
||||
} catch (AssertionError err) {
|
||||
if (System.currentTimeMillis() - start > 10000) {
|
||||
throw err;
|
||||
} else {
|
||||
Thread.sleep(300);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,14 +30,14 @@ import static org.mockito.Mockito.times;
|
|||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
|
@ -46,100 +46,191 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil.CouldNotCatchUpException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
public class TestFailureToReadEdits {
|
||||
private static final String TEST_DIR1 = "/test1";
|
||||
private static final String TEST_DIR2 = "/test2";
|
||||
private static final String TEST_DIR3 = "/test3";
|
||||
|
||||
private Configuration conf;
|
||||
private Runtime mockRuntime = mock(Runtime.class);
|
||||
private MiniDFSCluster cluster;
|
||||
private NameNode nn0;
|
||||
private NameNode nn1;
|
||||
private FileSystem fs;
|
||||
|
||||
@Before
|
||||
public void setUpCluster() throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 10);
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(null)
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(topology)
|
||||
.numDataNodes(0)
|
||||
.build();
|
||||
|
||||
cluster.waitActive();
|
||||
|
||||
nn0 = cluster.getNameNode(0);
|
||||
nn1 = cluster.getNameNode(1);
|
||||
nn1.getNamesystem().getEditLogTailer().setSleepTime(250);
|
||||
nn1.getNamesystem().getEditLogTailer().interrupt();
|
||||
nn1.getNamesystem().getEditLogTailer().setRuntime(mockRuntime);
|
||||
|
||||
cluster.transitionToActive(0);
|
||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownCluster() throws Exception {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the standby NN won't double-replay earlier edits if it encounters
|
||||
* a failure to read a later edit.
|
||||
*/
|
||||
@Test
|
||||
public void testFailuretoReadEdits() throws IOException,
|
||||
ServiceFailedException, URISyntaxException, InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
public void testFailuretoReadEdits() throws Exception {
|
||||
assertTrue(fs.mkdirs(new Path(TEST_DIR1)));
|
||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||
.numDataNodes(0)
|
||||
.build();
|
||||
// If these two ops are applied twice, the first op will throw an
|
||||
// exception the second time its replayed.
|
||||
fs.setOwner(new Path(TEST_DIR1), "foo", "bar");
|
||||
assertTrue(fs.delete(new Path(TEST_DIR1), true));
|
||||
|
||||
// This op should get applied just fine.
|
||||
assertTrue(fs.mkdirs(new Path(TEST_DIR2)));
|
||||
|
||||
// This is the op the mocking will cause to fail to be read.
|
||||
assertTrue(fs.mkdirs(new Path(TEST_DIR3)));
|
||||
|
||||
LimitedEditLogAnswer answer = causeFailureOnEditLogRead();
|
||||
|
||||
try {
|
||||
cluster.waitActive();
|
||||
cluster.transitionToActive(0);
|
||||
|
||||
Runtime mockRuntime = mock(Runtime.class);
|
||||
|
||||
NameNode nn1 = cluster.getNameNode(0);
|
||||
NameNode nn2 = cluster.getNameNode(1);
|
||||
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
|
||||
nn2.getNamesystem().getEditLogTailer().interrupt();
|
||||
nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime);
|
||||
|
||||
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
fs.mkdirs(new Path(TEST_DIR1));
|
||||
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
|
||||
|
||||
// If these two ops are applied twice, the first op will throw an
|
||||
// exception the second time its replayed.
|
||||
fs.setOwner(new Path(TEST_DIR1), "foo", "bar");
|
||||
fs.delete(new Path(TEST_DIR1), true);
|
||||
|
||||
// This op should get applied just fine.
|
||||
fs.mkdirs(new Path(TEST_DIR2));
|
||||
|
||||
// This is the op the mocking will cause to fail to be read.
|
||||
fs.mkdirs(new Path(TEST_DIR3));
|
||||
|
||||
FSEditLog spyEditLog = spy(nn2.getNamesystem().getEditLogTailer()
|
||||
.getEditLog());
|
||||
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
||||
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
||||
anyLong(), anyLong(), anyBoolean());
|
||||
nn2.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
|
||||
|
||||
try {
|
||||
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
|
||||
fail("Standby fully caught up, but should not have been able to");
|
||||
} catch (HATestUtil.CouldNotCatchUpException e) {
|
||||
verify(mockRuntime, times(0)).exit(anyInt());
|
||||
}
|
||||
|
||||
// Null because it was deleted.
|
||||
assertNull(NameNodeAdapter.getFileInfo(nn2,
|
||||
TEST_DIR1, false));
|
||||
// Should have been successfully created.
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn2,
|
||||
TEST_DIR2, false).isDir());
|
||||
// Null because it hasn't been created yet.
|
||||
assertNull(NameNodeAdapter.getFileInfo(nn2,
|
||||
TEST_DIR3, false));
|
||||
|
||||
// Now let the standby read ALL the edits.
|
||||
answer.setThrowExceptionOnRead(false);
|
||||
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
|
||||
|
||||
// Null because it was deleted.
|
||||
assertNull(NameNodeAdapter.getFileInfo(nn2,
|
||||
TEST_DIR1, false));
|
||||
// Should have been successfully created.
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn2,
|
||||
TEST_DIR2, false).isDir());
|
||||
// Should now have been successfully created.
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn2,
|
||||
TEST_DIR3, false).isDir());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||
fail("Standby fully caught up, but should not have been able to");
|
||||
} catch (HATestUtil.CouldNotCatchUpException e) {
|
||||
verify(mockRuntime, times(0)).exit(anyInt());
|
||||
}
|
||||
|
||||
// Null because it was deleted.
|
||||
assertNull(NameNodeAdapter.getFileInfo(nn1,
|
||||
TEST_DIR1, false));
|
||||
// Should have been successfully created.
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn1,
|
||||
TEST_DIR2, false).isDir());
|
||||
// Null because it hasn't been created yet.
|
||||
assertNull(NameNodeAdapter.getFileInfo(nn1,
|
||||
TEST_DIR3, false));
|
||||
|
||||
// Now let the standby read ALL the edits.
|
||||
answer.setThrowExceptionOnRead(false);
|
||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||
|
||||
// Null because it was deleted.
|
||||
assertNull(NameNodeAdapter.getFileInfo(nn1,
|
||||
TEST_DIR1, false));
|
||||
// Should have been successfully created.
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn1,
|
||||
TEST_DIR2, false).isDir());
|
||||
// Should now have been successfully created.
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn1,
|
||||
TEST_DIR3, false).isDir());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the following case:
|
||||
* 1. SBN is reading a finalized edits file when NFS disappears halfway
|
||||
* through (or some intermittent error happens)
|
||||
* 2. SBN performs a checkpoint and uploads it to the NN
|
||||
* 3. NN receives a checkpoint that doesn't correspond to the end of any log
|
||||
* segment
|
||||
* 4. Both NN and SBN should be able to restart at this point.
|
||||
*
|
||||
* This is a regression test for HDFS-2766.
|
||||
*/
|
||||
@Test
|
||||
public void testCheckpointStartingMidEditsFile() throws Exception {
|
||||
assertTrue(fs.mkdirs(new Path(TEST_DIR1)));
|
||||
|
||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||
|
||||
// Once the standby catches up, it should notice that it needs to
|
||||
// do a checkpoint and save one to its local directories.
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3));
|
||||
|
||||
// It should also upload it back to the active.
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3));
|
||||
|
||||
causeFailureOnEditLogRead();
|
||||
|
||||
assertTrue(fs.mkdirs(new Path(TEST_DIR2)));
|
||||
assertTrue(fs.mkdirs(new Path(TEST_DIR3)));
|
||||
|
||||
try {
|
||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||
fail("Standby fully caught up, but should not have been able to");
|
||||
} catch (HATestUtil.CouldNotCatchUpException e) {
|
||||
verify(mockRuntime, times(0)).exit(anyInt());
|
||||
}
|
||||
|
||||
// 5 because we should get OP_START_LOG_SEGMENT and one successful OP_MKDIR
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3, 5));
|
||||
|
||||
// It should also upload it back to the active.
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5));
|
||||
|
||||
// Restart the active NN
|
||||
cluster.restartNameNode(0);
|
||||
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5));
|
||||
|
||||
FileSystem fs0 = null;
|
||||
try {
|
||||
// Make sure that when the active restarts, it loads all the edits.
|
||||
fs0 = FileSystem.get(NameNode.getUri(nn0.getNameNodeAddress()),
|
||||
conf);
|
||||
|
||||
assertTrue(fs0.exists(new Path(TEST_DIR1)));
|
||||
assertTrue(fs0.exists(new Path(TEST_DIR2)));
|
||||
assertTrue(fs0.exists(new Path(TEST_DIR3)));
|
||||
} finally {
|
||||
if (fs0 != null)
|
||||
fs0.close();
|
||||
}
|
||||
}
|
||||
|
||||
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
|
||||
FSEditLog spyEditLog = spy(nn1.getNamesystem().getEditLogTailer()
|
||||
.getEditLog());
|
||||
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
||||
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
||||
anyLong(), anyLong(), anyBoolean());
|
||||
nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
|
||||
|
||||
return answer;
|
||||
}
|
||||
|
||||
private static class LimitedEditLogAnswer
|
||||
|
|
|
@ -93,10 +93,10 @@ public class TestStandbyCheckpoints {
|
|||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||
// Once the standby catches up, it should notice that it needs to
|
||||
// do a checkpoint and save one to its local directories.
|
||||
waitForCheckpoint(1, ImmutableList.of(0, 12));
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12));
|
||||
|
||||
// It should also upload it back to the active.
|
||||
waitForCheckpoint(0, ImmutableList.of(0, 12));
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -118,8 +118,8 @@ public class TestStandbyCheckpoints {
|
|||
// so the standby will catch up. Then, both will be in standby mode
|
||||
// with enough uncheckpointed txns to cause a checkpoint, and they
|
||||
// will each try to take a checkpoint and upload to each other.
|
||||
waitForCheckpoint(1, ImmutableList.of(0, 12));
|
||||
waitForCheckpoint(0, ImmutableList.of(0, 12));
|
||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12));
|
||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12));
|
||||
|
||||
assertEquals(12, nn0.getNamesystem().getFSImage()
|
||||
.getMostRecentCheckpointTxId());
|
||||
|
@ -212,7 +212,6 @@ public class TestStandbyCheckpoints {
|
|||
assertTrue(StandbyCheckpointer.getCanceledCount() > 0);
|
||||
}
|
||||
|
||||
|
||||
private void doEdits(int start, int stop) throws IOException {
|
||||
for (int i = start; i < stop; i++) {
|
||||
Path p = new Path("/test" + i);
|
||||
|
@ -220,20 +219,4 @@ public class TestStandbyCheckpoints {
|
|||
}
|
||||
}
|
||||
|
||||
private void waitForCheckpoint(int nnIdx, List<Integer> txids)
|
||||
throws InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids);
|
||||
return;
|
||||
} catch (AssertionError err) {
|
||||
if (System.currentTimeMillis() - start > 10000) {
|
||||
throw err;
|
||||
} else {
|
||||
Thread.sleep(300);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue