HDFS-4799. Corrupt replica can be prematurely removed from corruptReplicas map. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481084 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a96780013e
commit
1161ceb2c9
|
@ -978,6 +978,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
HDFS-4810. several HDFS HA tests have timeouts that are too short. (Chris
|
HDFS-4810. several HDFS HA tests have timeouts that are too short. (Chris
|
||||||
Nauroth via atm)
|
Nauroth via atm)
|
||||||
|
|
||||||
|
HDFS-4799. Corrupt replica can be prematurely removed from
|
||||||
|
corruptReplicas map. (todd via kihwal)
|
||||||
|
|
||||||
Release 2.0.4-alpha - 2013-04-25
|
Release 2.0.4-alpha - 2013-04-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1031,8 +1031,10 @@ public class BlockManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invalidates the given block on the given datanode.
|
* Invalidates the given block on the given datanode.
|
||||||
|
* @return true if the block was successfully invalidated and no longer
|
||||||
|
* present in the BlocksMap
|
||||||
*/
|
*/
|
||||||
private void invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
|
private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
blockLog.info("BLOCK* invalidateBlock: " + b + " on " + dn);
|
blockLog.info("BLOCK* invalidateBlock: " + b + " on " + dn);
|
||||||
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
||||||
|
@ -1049,7 +1051,7 @@ public class BlockManager {
|
||||||
nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
|
nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
|
||||||
"with potentially out-of-date block reports");
|
"with potentially out-of-date block reports");
|
||||||
postponeBlock(b.corrupted);
|
postponeBlock(b.corrupted);
|
||||||
|
return false;
|
||||||
} else if (nr.liveReplicas() >= 1) {
|
} else if (nr.liveReplicas() >= 1) {
|
||||||
// If we have at least one copy on a live node, then we can delete it.
|
// If we have at least one copy on a live node, then we can delete it.
|
||||||
addToInvalidates(b.corrupted, dn);
|
addToInvalidates(b.corrupted, dn);
|
||||||
|
@ -1058,9 +1060,11 @@ public class BlockManager {
|
||||||
blockLog.debug("BLOCK* invalidateBlocks: "
|
blockLog.debug("BLOCK* invalidateBlocks: "
|
||||||
+ b + " on " + dn + " listed for deletion.");
|
+ b + " on " + dn + " listed for deletion.");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
blockLog.info("BLOCK* invalidateBlocks: " + b
|
blockLog.info("BLOCK* invalidateBlocks: " + b
|
||||||
+ " on " + dn + " is the only copy and was not deleted");
|
+ " on " + dn + " is the only copy and was not deleted");
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2212,7 +2216,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
*/
|
*/
|
||||||
private void invalidateCorruptReplicas(BlockInfo blk) {
|
private void invalidateCorruptReplicas(BlockInfo blk) {
|
||||||
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
|
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
|
||||||
boolean gotException = false;
|
boolean removedFromBlocksMap = true;
|
||||||
if (nodes == null)
|
if (nodes == null)
|
||||||
return;
|
return;
|
||||||
// make a copy of the array of nodes in order to avoid
|
// make a copy of the array of nodes in order to avoid
|
||||||
|
@ -2220,16 +2224,19 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
|
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
|
||||||
for (DatanodeDescriptor node : nodesCopy) {
|
for (DatanodeDescriptor node : nodesCopy) {
|
||||||
try {
|
try {
|
||||||
invalidateBlock(new BlockToMarkCorrupt(blk, null), node);
|
if (!invalidateBlock(new BlockToMarkCorrupt(blk, null), node)) {
|
||||||
|
removedFromBlocksMap = false;
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
blockLog.info("invalidateCorruptReplicas "
|
blockLog.info("invalidateCorruptReplicas "
|
||||||
+ "error in deleting bad block " + blk + " on " + node, e);
|
+ "error in deleting bad block " + blk + " on " + node, e);
|
||||||
gotException = true;
|
removedFromBlocksMap = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Remove the block from corruptReplicasMap
|
// Remove the block from corruptReplicasMap
|
||||||
if (!gotException)
|
if (removedFromBlocksMap) {
|
||||||
corruptReplicas.removeFromCorruptReplicasMap(blk);
|
corruptReplicas.removeFromCorruptReplicasMap(blk);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -31,18 +35,26 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.TestDNFencing.RandomDeleterPolicy;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test when RBW block is removed. Invalidation of the corrupted block happens
|
* Test when RBW block is removed. Invalidation of the corrupted block happens
|
||||||
* and then the under replicated block gets replicated to the datanode.
|
* and then the under replicated block gets replicated to the datanode.
|
||||||
*/
|
*/
|
||||||
public class TestRBWBlockInvalidation {
|
public class TestRBWBlockInvalidation {
|
||||||
|
private static Log LOG = LogFactory.getLog(TestRBWBlockInvalidation.class);
|
||||||
|
|
||||||
private static NumberReplicas countReplicas(final FSNamesystem namesystem,
|
private static NumberReplicas countReplicas(final FSNamesystem namesystem,
|
||||||
ExtendedBlock block) {
|
ExtendedBlock block) {
|
||||||
return namesystem.getBlockManager().countNodes(block.getLocalBlock());
|
return namesystem.getBlockManager().countNodes(block.getLocalBlock());
|
||||||
|
@ -125,4 +137,101 @@ public class TestRBWBlockInvalidation {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Regression test for HDFS-4799, a case where, upon restart, if there
|
||||||
|
* were RWR replicas with out-of-date genstamps, the NN could accidentally
|
||||||
|
* delete good replicas instead of the bad replicas.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testRWRInvalidation() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
// Set the deletion policy to be randomized rather than the default.
|
||||||
|
// The default is based on disk space, which isn't controllable
|
||||||
|
// in the context of the test, whereas a random one is more accurate
|
||||||
|
// to what is seen in real clusters (nodes have random amounts of free
|
||||||
|
// space)
|
||||||
|
conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
|
||||||
|
BlockPlacementPolicy.class);
|
||||||
|
|
||||||
|
// Speed up the test a bit with faster heartbeats.
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
|
|
||||||
|
// Test with a bunch of separate files, since otherwise the test may
|
||||||
|
// fail just due to "good luck", even if a bug is present.
|
||||||
|
List<Path> testPaths = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
testPaths.add(new Path("/test" + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
List<FSDataOutputStream> streams = Lists.newArrayList();
|
||||||
|
try {
|
||||||
|
// Open the test files and write some data to each
|
||||||
|
for (Path path : testPaths) {
|
||||||
|
FSDataOutputStream out = cluster.getFileSystem().create(path, (short)2);
|
||||||
|
streams.add(out);
|
||||||
|
|
||||||
|
out.writeBytes("old gs data\n");
|
||||||
|
out.hflush();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Shutdown one of the nodes in the pipeline
|
||||||
|
DataNodeProperties oldGenstampNode = cluster.stopDataNode(0);
|
||||||
|
|
||||||
|
// Write some more data and flush again. This data will only
|
||||||
|
// be in the latter genstamp copy of the blocks.
|
||||||
|
for (int i = 0; i < streams.size(); i++) {
|
||||||
|
Path path = testPaths.get(i);
|
||||||
|
FSDataOutputStream out = streams.get(i);
|
||||||
|
|
||||||
|
out.writeBytes("new gs data\n");
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
// Set replication so that only one node is necessary for this block,
|
||||||
|
// and close it.
|
||||||
|
cluster.getFileSystem().setReplication(path, (short)1);
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upon restart, there will be two replicas, one with an old genstamp
|
||||||
|
// and one current copy. This test wants to ensure that the old genstamp
|
||||||
|
// copy is the one that is deleted.
|
||||||
|
|
||||||
|
LOG.info("=========================== restarting cluster");
|
||||||
|
DataNodeProperties otherNode = cluster.stopDataNode(0);
|
||||||
|
cluster.restartNameNode();
|
||||||
|
|
||||||
|
// Restart the datanode with the corrupt replica first.
|
||||||
|
cluster.restartDataNode(oldGenstampNode);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Then the other node
|
||||||
|
cluster.restartDataNode(otherNode);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Compute and send invalidations, waiting until they're fully processed.
|
||||||
|
cluster.getNameNode().getNamesystem().getBlockManager()
|
||||||
|
.computeInvalidateWork(2);
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
HATestUtil.waitForDNDeletions(cluster);
|
||||||
|
cluster.triggerDeletionReports();
|
||||||
|
|
||||||
|
// Make sure we can still read the blocks.
|
||||||
|
for (Path path : testPaths) {
|
||||||
|
String ret = DFSTestUtil.readFile(cluster.getFileSystem(), path);
|
||||||
|
assertEquals("old gs data\n" + "new gs data\n", ret);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ public abstract class HATestUtil {
|
||||||
* Wait for the datanodes in the cluster to process any block
|
* Wait for the datanodes in the cluster to process any block
|
||||||
* deletions that have already been asynchronously queued.
|
* deletions that have already been asynchronously queued.
|
||||||
*/
|
*/
|
||||||
static void waitForDNDeletions(final MiniDFSCluster cluster)
|
public static void waitForDNDeletions(final MiniDFSCluster cluster)
|
||||||
throws TimeoutException, InterruptedException {
|
throws TimeoutException, InterruptedException {
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue