HDFS-13840. RBW Blocks which are having less GS should be added to Corrupt. Contributed by Brahma Reddy Battula
(cherry picked from commit e0ff8e2c10
)
This commit is contained in:
parent
97b75f47fd
commit
02f8b5da47
|
@ -3072,10 +3072,20 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
case RBW:
|
case RBW:
|
||||||
case RWR:
|
case RWR:
|
||||||
|
final long reportedGS = reported.getGenerationStamp();
|
||||||
if (!storedBlock.isComplete()) {
|
if (!storedBlock.isComplete()) {
|
||||||
|
//When DN report lesser GS than the storedBlock then mark it is corrupt,
|
||||||
|
//As already valid replica will be present.
|
||||||
|
if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
|
||||||
|
return new BlockToMarkCorrupt(new Block(reported), storedBlock,
|
||||||
|
reportedGS,
|
||||||
|
"reported " + reportedState + " replica with genstamp "
|
||||||
|
+ reportedGS
|
||||||
|
+ " does not match Stored block's genstamp in block map "
|
||||||
|
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
|
||||||
|
}
|
||||||
return null; // not corrupt
|
return null; // not corrupt
|
||||||
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
|
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
|
||||||
final long reportedGS = reported.getGenerationStamp();
|
|
||||||
return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
|
return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
|
||||||
"reported " + reportedState + " replica with genstamp " + reportedGS
|
"reported " + reportedState + " replica with genstamp " + reportedGS
|
||||||
+ " does not match COMPLETE block's genstamp in block map "
|
+ " does not match COMPLETE block's genstamp in block map "
|
||||||
|
@ -3135,8 +3145,11 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
block.getUnderConstructionFeature().addReplicaIfNotPresent(
|
block.getUnderConstructionFeature().addReplicaIfNotPresent(
|
||||||
storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
|
storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
|
||||||
|
|
||||||
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
|
// Add replica if appropriate. If the replica was previously corrupt
|
||||||
(block.findStorageInfo(storageInfo) < 0)) {
|
// but now okay, it might need to be updated.
|
||||||
|
if (ucBlock.reportedState == ReplicaState.FINALIZED && (
|
||||||
|
block.findStorageInfo(storageInfo) < 0) || corruptReplicas
|
||||||
|
.isReplicaCorrupt(block, storageInfo.getDatanodeDescriptor())) {
|
||||||
addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
|
addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -559,6 +559,11 @@ public class MiniDFSCluster implements AutoCloseable {
|
||||||
public void setDnArgs(String ... args) {
|
public void setDnArgs(String ... args) {
|
||||||
dnArgs = args;
|
dnArgs = args;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DataNode getDatanode() {
|
||||||
|
return datanode;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
|
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -27,6 +28,7 @@ import java.util.List;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -244,6 +246,42 @@ public class TestRBWBlockInvalidation {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRWRShouldNotAddedOnDNRestart() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
|
||||||
|
"false");
|
||||||
|
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(2).build()) {
|
||||||
|
Path path = new Path("/testRBW");
|
||||||
|
FSDataOutputStream out = cluster.getFileSystem().create(path, (short) 2);
|
||||||
|
out.writeBytes("old gs data\n");
|
||||||
|
out.hflush();
|
||||||
|
// stop one datanode
|
||||||
|
DataNodeProperties dnProp = cluster.stopDataNode(0);
|
||||||
|
String dnAddress = dnProp.getDatanode().getXferAddress().toString();
|
||||||
|
if (dnAddress.startsWith("/")) {
|
||||||
|
dnAddress = dnAddress.substring(1);
|
||||||
|
}
|
||||||
|
//Write some more data after DN stopped.
|
||||||
|
out.writeBytes("old gs data\n");
|
||||||
|
out.hflush();
|
||||||
|
cluster.restartDataNode(dnProp, true);
|
||||||
|
// wait till the block report comes
|
||||||
|
Thread.sleep(3000);
|
||||||
|
// check the block locations, this should not contain restarted datanode
|
||||||
|
BlockLocation[] locations = cluster.getFileSystem()
|
||||||
|
.getFileBlockLocations(path, 0, Long.MAX_VALUE);
|
||||||
|
String[] names = locations[0].getNames();
|
||||||
|
for (String node : names) {
|
||||||
|
if (node.equals(dnAddress)) {
|
||||||
|
fail("Old GS DN should not be present in latest block locations.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void waitForNumTotalBlocks(final MiniDFSCluster cluster,
|
private void waitForNumTotalBlocks(final MiniDFSCluster cluster,
|
||||||
final int numTotalBlocks) throws Exception {
|
final int numTotalBlocks) throws Exception {
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
|
|
@ -337,8 +337,8 @@ public class TestAddStripedBlocks {
|
||||||
int i = groupSize - 1;
|
int i = groupSize - 1;
|
||||||
for (DataNode dn : cluster.getDataNodes()) {
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
String storageID = storageIDs.get(i);
|
String storageID = storageIDs.get(i);
|
||||||
final Block block = new Block(lastBlock.getBlockId() + i--,
|
final Block block = new Block(lastBlock.getBlockId() + i--, 0,
|
||||||
lastBlock.getGenerationStamp(), 0);
|
lastBlock.getGenerationStamp());
|
||||||
DatanodeStorage storage = new DatanodeStorage(storageID);
|
DatanodeStorage storage = new DatanodeStorage(storageID);
|
||||||
List<ReplicaBeingWritten> blocks = new ArrayList<>();
|
List<ReplicaBeingWritten> blocks = new ArrayList<>();
|
||||||
ReplicaBeingWritten replica = new ReplicaBeingWritten(block, null, null,
|
ReplicaBeingWritten replica = new ReplicaBeingWritten(block, null, null,
|
||||||
|
|
Loading…
Reference in New Issue