HDFS-3605. Block mistakenly marked corrupt during edit log catchup phase of failover. Contributed by Todd Lipcon and Brahma Reddy Battula.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1363175 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-07-18 23:42:13 +00:00
parent 0a6806ce8c
commit 23b6ed973e
5 changed files with 133 additions and 11 deletions

View File

@ -506,6 +506,9 @@ Branch-2 ( Unreleased changes )
HDFS-3609. libhdfs: don't force the URI to look like hdfs://hostname:port.
(Colin Patrick McCabe via eli)
HDFS-3605. Block mistakenly marked corrupt during edit log catchup
phase of failover. (todd and Brahma Reddy Battula via todd)
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -206,6 +206,14 @@ public class BlockManager {
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
/**
* When running inside a Standby node, the node may receive block reports
* from datanodes before receiving the corresponding namespace edits from
* the active NameNode. Thus, it will postpone them for later processing,
* instead of marking the blocks as corrupt.
*/
private boolean shouldPostponeBlocksFromFuture = false;
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
@ -1014,6 +1022,12 @@ public class BlockManager {
}
}
public void setPostponeBlocksFromFuture(boolean postpone) {
this.shouldPostponeBlocksFromFuture = postpone;
}
private void postponeBlock(Block blk) {
if (postponedMisreplicatedBlocks.add(blk)) {
postponedMisreplicatedBlocksCount++;
@ -1590,13 +1604,11 @@ public class BlockManager {
assert (node.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
boolean isStandby = namesystem.isInStandbyState();
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState reportedState = itBR.getCurrentReplicaState();
if (isStandby &&
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
queueReportedBlock(node, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
@ -1612,7 +1624,7 @@ public class BlockManager {
BlockToMarkCorrupt c = checkReplicaCorrupt(
iblk, reportedState, storedBlock, ucState, node);
if (c != null) {
if (namesystem.isInStandbyState()) {
if (shouldPostponeBlocksFromFuture) {
// In the Standby, we may receive a block report for a file that we
// just have an out-of-date gen-stamp or state for, for example.
queueReportedBlock(node, iblk, reportedState,
@ -1718,7 +1730,7 @@ public class BlockManager {
+ " replicaState = " + reportedState);
}
if (namesystem.isInStandbyState() &&
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block.getGenerationStamp())) {
queueReportedBlock(dn, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
@ -1751,7 +1763,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
BlockToMarkCorrupt c = checkReplicaCorrupt(
block, reportedState, storedBlock, ucState, dn);
if (c != null) {
if (namesystem.isInStandbyState()) {
if (shouldPostponeBlocksFromFuture) {
// If the block is an out-of-date generation stamp or state,
// but we're the standby, we shouldn't treat it as corrupt,
// but instead just queue it for later processing.
@ -1784,7 +1796,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
*/
private void queueReportedBlock(DatanodeDescriptor dn, Block block,
ReplicaState reportedState, String reason) {
assert namesystem.isInStandbyState();
assert shouldPostponeBlocksFromFuture;
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing reported block " + block +
@ -1827,9 +1839,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* with the namespace information.
*/
public void processAllPendingDNMessages() throws IOException {
assert !namesystem.isInStandbyState() :
"processAllPendingDNMessages() should be called after exiting " +
"standby state!";
assert !shouldPostponeBlocksFromFuture :
"processAllPendingDNMessages() should be called after disabling " +
"block postponement.";
int count = pendingDNMessages.count();
if (count > 0) {
LOG.info("Processing " + count + " messages from DataNodes " +

View File

@ -623,6 +623,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs.");
editLogTailer.catchupDuringFailover();
blockManager.setPostponeBlocksFromFuture(false);
LOG.info("Reprocessing replication and invalidation queues...");
blockManager.getDatanodeManager().markAllDatanodesStale();
@ -706,6 +707,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// During startup, we're already open for read.
dir.fsImage.editLog.initSharedJournalsForRead();
}
blockManager.setPostponeBlocksFromFuture(true);
editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start();
if (standbyShouldCheckpoint) {

View File

@ -179,7 +179,8 @@ public class EditLogTailer {
}
}
private void doTailEdits() throws IOException, InterruptedException {
@VisibleForTesting
void doTailEdits() throws IOException, InterruptedException {
// Write lock needs to be interruptible here because the
// transitionToActive RPC takes the write lock before calling
// tailer.stop() -- so if we're not interruptible, it will

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
public class TestHAAppend {
/**
* Test to verify the processing of PendingDataNodeMessageQueue in case of
* append. One block will marked as corrupt if the OP_ADD, OP_UPDATE_BLOCKS
* comes in one edit log segment and OP_CLOSE edit comes in next log segment
* which is loaded during failover. Regression test for HDFS-3605.
*/
@Test
public void testMultipleAppendsDuringCatchupTailing() throws Exception {
Configuration conf = new Configuration();
// Set a length edits tailing period, and explicit rolling, so we can
// control the ingest of edits by the standby for this test.
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "5000");
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, -1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3).build();
FileSystem fs = null;
try {
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
Path fileToAppend = new Path("/FileToAppend");
// Create file, write some data, and hflush so that the first
// block is in the edit log prior to roll.
FSDataOutputStream out = fs.create(fileToAppend);
out.writeBytes("/data");
out.hflush();
// Let the StandbyNode catch the creation of the file.
cluster.getNameNode(0).getRpcServer().rollEditLog();
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
out.close();
// Append and re-close a few time, so that many block entries are queued.
for (int i = 0; i < 5; i++) {
DFSTestUtil.appendFile(fs, fileToAppend, "data");
}
// Ensure that blocks have been reported to the SBN ahead of the edits
// arriving.
cluster.triggerBlockReports();
// Failover the current standby to active.
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
// Check the FSCK doesn't detect any bad blocks on the SBN.
int rc = ToolRunner.run(new DFSck(cluster.getConfiguration(1)),
new String[] { "/", "-files", "-blocks" });
assertEquals(0, rc);
assertEquals("CorruptBlocks should be empty.", 0, cluster.getNameNode(1)
.getNamesystem().getCorruptReplicaBlocks());
} finally {
if (null != cluster) {
cluster.shutdown();
}
if (null != fs) {
fs.close();
}
}
}
}