diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 55939fe34d4..9441e52958d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -99,3 +99,5 @@ HDFS-2753. Fix standby getting stuck in safemode when blocks are written while S HDFS-2773. Reading edit logs from an earlier version should not leave blocks in under-construction state. (todd) HDFS-2775. Fix TestStandbyCheckpoints.testBothNodesInStandbyState failing intermittently. (todd) + +HDFS-2766. Test for case where standby partially reads log and then performs checkpoint. (atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 06b8eff3fa9..3c6bec6cd5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -204,7 +204,13 @@ class FileJournalManager implements JournalManager { } EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(), elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress()); - elfis.skipTransactions(fromTxId - elf.getFirstTxId()); + long transactionsToSkip = fromTxId - elf.getFirstTxId(); + if (transactionsToSkip > 0) { + LOG.info(String.format("Log begins at txid %d, but requested start " + + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId, + transactionsToSkip)); + elfis.skipTransactions(transactionsToSkip); + } return elfis; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index 2e4e932b386..f0b8a6d2b30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -33,7 +33,9 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -64,6 +66,8 @@ import static org.mockito.Mockito.mock; */ public abstract class FSImageTestUtil { + public static final Log LOG = LogFactory.getLog(FSImageTestUtil.class.getName()); + /** * The position in the fsimage header where the txid is * written. @@ -410,6 +414,8 @@ public abstract class FSImageTestUtil { for (File nameDir : getNameNodeCurrentDirs(cluster, nnIdx)) { // Should have fsimage_N for the three checkpoints + LOG.info("Examining storage dir " + nameDir + " with contents: " + + StringUtils.join(nameDir.listFiles(), ", ")); for (long checkpointTxId : txids) { File image = new File(nameDir, NNStorage.getImageFileName(checkpointTxId)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 1b7b62dade7..ba05da82414 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; +import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; @@ -159,4 +161,21 @@ public abstract class HATestUtil { public static String getLogicalHostname(MiniDFSCluster cluster) { return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); } + + public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, + List txids) throws InterruptedException { + long start = System.currentTimeMillis(); + while (true) { + try { + FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids); + return; + } catch (AssertionError err) { + if (System.currentTimeMillis() - start > 10000) { + throw err; + } else { + Thread.sleep(300); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java index 96a62960984..ca51b4eb9ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java @@ -30,14 +30,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; -import java.net.URISyntaxException; +import java.net.URI; import java.util.Collection; import java.util.LinkedList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; @@ -46,100 +46,191 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil.CouldNotCatchUpException; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.ImmutableList; + public class TestFailureToReadEdits { private static final String TEST_DIR1 = "/test1"; private static final String TEST_DIR2 = "/test2"; private static final String TEST_DIR3 = "/test3"; + + private Configuration conf; + private Runtime mockRuntime = mock(Runtime.class); + private MiniDFSCluster cluster; + private NameNode nn0; + private NameNode nn1; + private FileSystem fs; + + @Before + public void setUpCluster() throws Exception { + conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 10); + HAUtil.setAllowStandbyReads(conf, true); + + MiniDFSNNTopology topology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf(null) + .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)) + .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002))); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(topology) + .numDataNodes(0) + .build(); + + cluster.waitActive(); + + nn0 = cluster.getNameNode(0); + nn1 = cluster.getNameNode(1); + nn1.getNamesystem().getEditLogTailer().setSleepTime(250); + nn1.getNamesystem().getEditLogTailer().interrupt(); + nn1.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); + + cluster.transitionToActive(0); + fs = HATestUtil.configureFailoverFs(cluster, conf); + } + + @After + public void tearDownCluster() throws Exception { + if (fs != null) { + fs.close(); + } + + if (cluster != null) { + cluster.shutdown(); + } + } /** * Test that the standby NN won't double-replay earlier edits if it encounters * a failure to read a later edit. */ @Test - public void testFailuretoReadEdits() throws IOException, - ServiceFailedException, URISyntaxException, InterruptedException { - Configuration conf = new Configuration(); - HAUtil.setAllowStandbyReads(conf, true); + public void testFailuretoReadEdits() throws Exception { + assertTrue(fs.mkdirs(new Path(TEST_DIR1))); + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(0) - .build(); + // If these two ops are applied twice, the first op will throw an + // exception the second time its replayed. + fs.setOwner(new Path(TEST_DIR1), "foo", "bar"); + assertTrue(fs.delete(new Path(TEST_DIR1), true)); + + // This op should get applied just fine. + assertTrue(fs.mkdirs(new Path(TEST_DIR2))); + + // This is the op the mocking will cause to fail to be read. + assertTrue(fs.mkdirs(new Path(TEST_DIR3))); + + LimitedEditLogAnswer answer = causeFailureOnEditLogRead(); try { - cluster.waitActive(); - cluster.transitionToActive(0); - - Runtime mockRuntime = mock(Runtime.class); - - NameNode nn1 = cluster.getNameNode(0); - NameNode nn2 = cluster.getNameNode(1); - nn2.getNamesystem().getEditLogTailer().setSleepTime(250); - nn2.getNamesystem().getEditLogTailer().interrupt(); - nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); - - FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); - fs.mkdirs(new Path(TEST_DIR1)); - HATestUtil.waitForStandbyToCatchUp(nn1, nn2); - - // If these two ops are applied twice, the first op will throw an - // exception the second time its replayed. - fs.setOwner(new Path(TEST_DIR1), "foo", "bar"); - fs.delete(new Path(TEST_DIR1), true); - - // This op should get applied just fine. - fs.mkdirs(new Path(TEST_DIR2)); - - // This is the op the mocking will cause to fail to be read. - fs.mkdirs(new Path(TEST_DIR3)); - - FSEditLog spyEditLog = spy(nn2.getNamesystem().getEditLogTailer() - .getEditLog()); - LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); - doAnswer(answer).when(spyEditLog).selectInputStreams( - anyLong(), anyLong(), anyBoolean()); - nn2.getNamesystem().getEditLogTailer().setEditLog(spyEditLog); - - try { - HATestUtil.waitForStandbyToCatchUp(nn1, nn2); - fail("Standby fully caught up, but should not have been able to"); - } catch (HATestUtil.CouldNotCatchUpException e) { - verify(mockRuntime, times(0)).exit(anyInt()); - } - - // Null because it was deleted. - assertNull(NameNodeAdapter.getFileInfo(nn2, - TEST_DIR1, false)); - // Should have been successfully created. - assertTrue(NameNodeAdapter.getFileInfo(nn2, - TEST_DIR2, false).isDir()); - // Null because it hasn't been created yet. - assertNull(NameNodeAdapter.getFileInfo(nn2, - TEST_DIR3, false)); - - // Now let the standby read ALL the edits. - answer.setThrowExceptionOnRead(false); - HATestUtil.waitForStandbyToCatchUp(nn1, nn2); - - // Null because it was deleted. - assertNull(NameNodeAdapter.getFileInfo(nn2, - TEST_DIR1, false)); - // Should have been successfully created. - assertTrue(NameNodeAdapter.getFileInfo(nn2, - TEST_DIR2, false).isDir()); - // Should now have been successfully created. - assertTrue(NameNodeAdapter.getFileInfo(nn2, - TEST_DIR3, false).isDir()); - } finally { - if (cluster != null) { - cluster.shutdown(); - } + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + fail("Standby fully caught up, but should not have been able to"); + } catch (HATestUtil.CouldNotCatchUpException e) { + verify(mockRuntime, times(0)).exit(anyInt()); } + + // Null because it was deleted. + assertNull(NameNodeAdapter.getFileInfo(nn1, + TEST_DIR1, false)); + // Should have been successfully created. + assertTrue(NameNodeAdapter.getFileInfo(nn1, + TEST_DIR2, false).isDir()); + // Null because it hasn't been created yet. + assertNull(NameNodeAdapter.getFileInfo(nn1, + TEST_DIR3, false)); + + // Now let the standby read ALL the edits. + answer.setThrowExceptionOnRead(false); + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + + // Null because it was deleted. + assertNull(NameNodeAdapter.getFileInfo(nn1, + TEST_DIR1, false)); + // Should have been successfully created. + assertTrue(NameNodeAdapter.getFileInfo(nn1, + TEST_DIR2, false).isDir()); + // Should now have been successfully created. + assertTrue(NameNodeAdapter.getFileInfo(nn1, + TEST_DIR3, false).isDir()); + } + + /** + * Test the following case: + * 1. SBN is reading a finalized edits file when NFS disappears halfway + * through (or some intermittent error happens) + * 2. SBN performs a checkpoint and uploads it to the NN + * 3. NN receives a checkpoint that doesn't correspond to the end of any log + * segment + * 4. Both NN and SBN should be able to restart at this point. + * + * This is a regression test for HDFS-2766. + */ + @Test + public void testCheckpointStartingMidEditsFile() throws Exception { + assertTrue(fs.mkdirs(new Path(TEST_DIR1))); + + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + + // Once the standby catches up, it should notice that it needs to + // do a checkpoint and save one to its local directories. + HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3)); + + // It should also upload it back to the active. + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3)); + + causeFailureOnEditLogRead(); + + assertTrue(fs.mkdirs(new Path(TEST_DIR2))); + assertTrue(fs.mkdirs(new Path(TEST_DIR3))); + + try { + HATestUtil.waitForStandbyToCatchUp(nn0, nn1); + fail("Standby fully caught up, but should not have been able to"); + } catch (HATestUtil.CouldNotCatchUpException e) { + verify(mockRuntime, times(0)).exit(anyInt()); + } + + // 5 because we should get OP_START_LOG_SEGMENT and one successful OP_MKDIR + HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3, 5)); + + // It should also upload it back to the active. + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5)); + + // Restart the active NN + cluster.restartNameNode(0); + + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5)); + + FileSystem fs0 = null; + try { + // Make sure that when the active restarts, it loads all the edits. + fs0 = FileSystem.get(NameNode.getUri(nn0.getNameNodeAddress()), + conf); + + assertTrue(fs0.exists(new Path(TEST_DIR1))); + assertTrue(fs0.exists(new Path(TEST_DIR2))); + assertTrue(fs0.exists(new Path(TEST_DIR3))); + } finally { + if (fs0 != null) + fs0.close(); + } + } + + private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException { + FSEditLog spyEditLog = spy(nn1.getNamesystem().getEditLogTailer() + .getEditLog()); + LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); + doAnswer(answer).when(spyEditLog).selectInputStreams( + anyLong(), anyLong(), anyBoolean()); + nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog); + + return answer; } private static class LimitedEditLogAnswer diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index b02ac5cdac2..83f077c55d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -93,10 +93,10 @@ public class TestStandbyCheckpoints { HATestUtil.waitForStandbyToCatchUp(nn0, nn1); // Once the standby catches up, it should notice that it needs to // do a checkpoint and save one to its local directories. - waitForCheckpoint(1, ImmutableList.of(0, 12)); + HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12)); // It should also upload it back to the active. - waitForCheckpoint(0, ImmutableList.of(0, 12)); + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12)); } /** @@ -118,8 +118,8 @@ public class TestStandbyCheckpoints { // so the standby will catch up. Then, both will be in standby mode // with enough uncheckpointed txns to cause a checkpoint, and they // will each try to take a checkpoint and upload to each other. - waitForCheckpoint(1, ImmutableList.of(0, 12)); - waitForCheckpoint(0, ImmutableList.of(0, 12)); + HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 12)); + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 12)); assertEquals(12, nn0.getNamesystem().getFSImage() .getMostRecentCheckpointTxId()); @@ -211,7 +211,6 @@ public class TestStandbyCheckpoints { assertTrue(StandbyCheckpointer.getCanceledCount() > 0); } - private void doEdits(int start, int stop) throws IOException { for (int i = start; i < stop; i++) { @@ -220,20 +219,4 @@ public class TestStandbyCheckpoints { } } - private void waitForCheckpoint(int nnIdx, List txids) - throws InterruptedException { - long start = System.currentTimeMillis(); - while (true) { - try { - FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids); - return; - } catch (AssertionError err) { - if (System.currentTimeMillis() - start > 10000) { - throw err; - } else { - Thread.sleep(300); - } - } - } - } }