HDFS-13840. RBW Blocks which are having less GS should be added to Corrupt. Contributed by Brahma Reddy Battula

This commit is contained in:
Brahma Reddy Battula 2018-09-26 23:44:16 +05:30
parent 6275b42870
commit e0ff8e2c10
4 changed files with 61 additions and 5 deletions

View File

@ -3086,10 +3086,20 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
}
case RBW:
case RWR:
final long reportedGS = reported.getGenerationStamp();
if (!storedBlock.isComplete()) {
//When DN report lesser GS than the storedBlock then mark it is corrupt,
//As already valid replica will be present.
if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
return new BlockToMarkCorrupt(new Block(reported), storedBlock,
reportedGS,
"reported " + reportedState + " replica with genstamp "
+ reportedGS
+ " does not match Stored block's genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
}
return null; // not corrupt
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
"reported " + reportedState + " replica with genstamp " + reportedGS
+ " does not match COMPLETE block's genstamp in block map "
@ -3149,8 +3159,11 @@ void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
block.getUnderConstructionFeature().addReplicaIfNotPresent(
storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
(block.findStorageInfo(storageInfo) < 0)) {
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (ucBlock.reportedState == ReplicaState.FINALIZED && (
block.findStorageInfo(storageInfo) < 0) || corruptReplicas
.isReplicaCorrupt(block, storageInfo.getDatanodeDescriptor())) {
addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
}
}

View File

@ -560,6 +560,11 @@ public class DataNodeProperties {
public void setDnArgs(String ... args) {
dnArgs = args;
}
public DataNode getDatanode() {
return datanode;
}
}
private Configuration conf;

View File

@ -19,6 +19,7 @@
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
@ -27,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -245,6 +247,42 @@ public void testRWRInvalidation() throws Exception {
}
@Test
public void testRWRShouldNotAddedOnDNRestart() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
"false");
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build()) {
Path path = new Path("/testRBW");
FSDataOutputStream out = cluster.getFileSystem().create(path, (short) 2);
out.writeBytes("old gs data\n");
out.hflush();
// stop one datanode
DataNodeProperties dnProp = cluster.stopDataNode(0);
String dnAddress = dnProp.getDatanode().getXferAddress().toString();
if (dnAddress.startsWith("/")) {
dnAddress = dnAddress.substring(1);
}
//Write some more data after DN stopped.
out.writeBytes("old gs data\n");
out.hflush();
cluster.restartDataNode(dnProp, true);
// wait till the block report comes
Thread.sleep(3000);
// check the block locations, this should not contain restarted datanode
BlockLocation[] locations = cluster.getFileSystem()
.getFileBlockLocations(path, 0, Long.MAX_VALUE);
String[] names = locations[0].getNames();
for (String node : names) {
if (node.equals(dnAddress)) {
fail("Old GS DN should not be present in latest block locations.");
}
}
out.close();
}
}
private void waitForNumTotalBlocks(final MiniDFSCluster cluster,
final int numTotalBlocks) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {

View File

@ -337,8 +337,8 @@ public void testAddUCReplica() throws Exception {
int i = groupSize - 1;
for (DataNode dn : cluster.getDataNodes()) {
String storageID = storageIDs.get(i);
final Block block = new Block(lastBlock.getBlockId() + i--,
lastBlock.getGenerationStamp(), 0);
final Block block = new Block(lastBlock.getBlockId() + i--, 0,
lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(storageID);
List<ReplicaBeingWritten> blocks = new ArrayList<>();
ReplicaBeingWritten replica = new ReplicaBeingWritten(block, null, null,