diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 48f75e4e164..d0d7a6a7ecd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -506,6 +506,9 @@ Branch-2 ( Unreleased changes ) HDFS-3609. libhdfs: don't force the URI to look like hdfs://hostname:port. (Colin Patrick McCabe via eli) + HDFS-3605. Block mistakenly marked corrupt during edit log catchup + phase of failover. (todd and Brahma Reddy Battula via todd) + BREAKDOWN OF HDFS-3042 SUBTASKS HDFS-2185. HDFS portion of ZK-based FailoverController (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 04504f2ae8b..7b80a4f054e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -206,6 +206,14 @@ public class BlockManager { /** variable to enable check for enough racks */ final boolean shouldCheckForEnoughRacks; + /** + * When running inside a Standby node, the node may receive block reports + * from datanodes before receiving the corresponding namespace edits from + * the active NameNode. Thus, it will postpone them for later processing, + * instead of marking the blocks as corrupt. + */ + private boolean shouldPostponeBlocksFromFuture = false; + /** for block replicas placement */ private BlockPlacementPolicy blockplacement; @@ -1014,6 +1022,12 @@ public class BlockManager { } } + + public void setPostponeBlocksFromFuture(boolean postpone) { + this.shouldPostponeBlocksFromFuture = postpone; + } + + private void postponeBlock(Block blk) { if (postponedMisreplicatedBlocks.add(blk)) { postponedMisreplicatedBlocksCount++; @@ -1590,13 +1604,11 @@ public class BlockManager { assert (node.numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); - boolean isStandby = namesystem.isInStandbyState(); - while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState reportedState = itBR.getCurrentReplicaState(); - if (isStandby && + if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk.getGenerationStamp())) { queueReportedBlock(node, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); @@ -1612,7 +1624,7 @@ public class BlockManager { BlockToMarkCorrupt c = checkReplicaCorrupt( iblk, reportedState, storedBlock, ucState, node); if (c != null) { - if (namesystem.isInStandbyState()) { + if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. queueReportedBlock(node, iblk, reportedState, @@ -1718,7 +1730,7 @@ public class BlockManager { + " replicaState = " + reportedState); } - if (namesystem.isInStandbyState() && + if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block.getGenerationStamp())) { queueReportedBlock(dn, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); @@ -1751,7 +1763,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block BlockToMarkCorrupt c = checkReplicaCorrupt( block, reportedState, storedBlock, ucState, dn); if (c != null) { - if (namesystem.isInStandbyState()) { + if (shouldPostponeBlocksFromFuture) { // If the block is an out-of-date generation stamp or state, // but we're the standby, we shouldn't treat it as corrupt, // but instead just queue it for later processing. @@ -1784,7 +1796,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block */ private void queueReportedBlock(DatanodeDescriptor dn, Block block, ReplicaState reportedState, String reason) { - assert namesystem.isInStandbyState(); + assert shouldPostponeBlocksFromFuture; if (LOG.isDebugEnabled()) { LOG.debug("Queueing reported block " + block + @@ -1827,9 +1839,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block * with the namespace information. */ public void processAllPendingDNMessages() throws IOException { - assert !namesystem.isInStandbyState() : - "processAllPendingDNMessages() should be called after exiting " + - "standby state!"; + assert !shouldPostponeBlocksFromFuture : + "processAllPendingDNMessages() should be called after disabling " + + "block postponement."; int count = pendingDNMessages.count(); if (count > 0) { LOG.info("Processing " + count + " messages from DataNodes " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 571bde80b74..9b63f5a1d8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -623,6 +623,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, LOG.info("Catching up to latest edits from old active before " + "taking over writer role in edits logs."); editLogTailer.catchupDuringFailover(); + blockManager.setPostponeBlocksFromFuture(false); LOG.info("Reprocessing replication and invalidation queues..."); blockManager.getDatanodeManager().markAllDatanodesStale(); @@ -706,6 +707,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // During startup, we're already open for read. dir.fsImage.editLog.initSharedJournalsForRead(); } + + blockManager.setPostponeBlocksFromFuture(true); + editLogTailer = new EditLogTailer(this, conf); editLogTailer.start(); if (standbyShouldCheckpoint) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index e2e65d41444..a403706bbfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -179,7 +179,8 @@ public class EditLogTailer { } } - private void doTailEdits() throws IOException, InterruptedException { + @VisibleForTesting + void doTailEdits() throws IOException, InterruptedException { // Write lock needs to be interruptible here because the // transitionToActive RPC takes the write lock before calling // tailer.stop() -- so if we're not interruptible, it will diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAAppend.java new file mode 100644 index 00000000000..9aa01221b3e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAAppend.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.hdfs.tools.DFSck; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; + +public class TestHAAppend { + + /** + * Test to verify the processing of PendingDataNodeMessageQueue in case of + * append. One block will marked as corrupt if the OP_ADD, OP_UPDATE_BLOCKS + * comes in one edit log segment and OP_CLOSE edit comes in next log segment + * which is loaded during failover. Regression test for HDFS-3605. + */ + @Test + public void testMultipleAppendsDuringCatchupTailing() throws Exception { + Configuration conf = new Configuration(); + + // Set a length edits tailing period, and explicit rolling, so we can + // control the ingest of edits by the standby for this test. + conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "5000"); + conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, -1); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3).build(); + FileSystem fs = null; + try { + cluster.transitionToActive(0); + fs = HATestUtil.configureFailoverFs(cluster, conf); + + Path fileToAppend = new Path("/FileToAppend"); + + // Create file, write some data, and hflush so that the first + // block is in the edit log prior to roll. + FSDataOutputStream out = fs.create(fileToAppend); + out.writeBytes("/data"); + out.hflush(); + + // Let the StandbyNode catch the creation of the file. + cluster.getNameNode(0).getRpcServer().rollEditLog(); + cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits(); + out.close(); + + // Append and re-close a few time, so that many block entries are queued. + for (int i = 0; i < 5; i++) { + DFSTestUtil.appendFile(fs, fileToAppend, "data"); + } + + // Ensure that blocks have been reported to the SBN ahead of the edits + // arriving. + cluster.triggerBlockReports(); + + // Failover the current standby to active. + cluster.shutdownNameNode(0); + cluster.transitionToActive(1); + + // Check the FSCK doesn't detect any bad blocks on the SBN. + int rc = ToolRunner.run(new DFSck(cluster.getConfiguration(1)), + new String[] { "/", "-files", "-blocks" }); + assertEquals(0, rc); + + assertEquals("CorruptBlocks should be empty.", 0, cluster.getNameNode(1) + .getNamesystem().getCorruptReplicaBlocks()); + } finally { + if (null != cluster) { + cluster.shutdown(); + } + if (null != fs) { + fs.close(); + } + } + } +} \ No newline at end of file