HDFS-11755. Underconstruction blocks can be considered missing. Contributed by Nathan Roberts.

This commit is contained in:
Kihwal Lee 2017-05-10 14:15:57 -05:00 committed by Xiaoyu Yao
parent b7b758ab11
commit 070a4830f6
4 changed files with 68 additions and 9 deletions

View File

@ -4092,7 +4092,7 @@ public class BlockManager implements BlockStatsMXBean {
final int curReplicasDelta, int expectedReplicasDelta) { final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock(); namesystem.writeLock();
try { try {
if (!isPopulatingReplQueues()) { if (!isPopulatingReplQueues() || !block.isComplete()) {
return; return;
} }
NumberReplicas repl = countNodes(block); NumberReplicas repl = countNodes(block);

View File

@ -100,7 +100,7 @@ import com.google.common.collect.Lists;
* caching directives, we will schedule caching and uncaching work. * caching directives, we will schedule caching and uncaching work.
*/ */
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public final class CacheManager { public class CacheManager {
public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class); public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f; private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;

View File

@ -275,6 +275,7 @@ public class TestFileCorruption {
out.write(outBuffer, 0, bufferSize); out.write(outBuffer, 0, bufferSize);
out.close(); out.close();
dfs.setReplication(filePath, (short) 10); dfs.setReplication(filePath, (short) 10);
cluster.triggerBlockReports();
// underreplicated Blocks should be one after setrep // underreplicated Blocks should be one after setrep
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() { @Override public Boolean get() {

View File

@ -83,11 +83,15 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -102,6 +106,8 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
@ -146,7 +152,20 @@ public class TestBlockManager {
Mockito.doReturn(true).when(fsn).hasWriteLock(); Mockito.doReturn(true).when(fsn).hasWriteLock();
Mockito.doReturn(true).when(fsn).hasReadLock(); Mockito.doReturn(true).when(fsn).hasReadLock();
Mockito.doReturn(true).when(fsn).isRunning(); Mockito.doReturn(true).when(fsn).isRunning();
//Make shouldPopulaeReplQueues return true
HAContext haContext = Mockito.mock(HAContext.class);
HAState haState = Mockito.mock(HAState.class);
Mockito.when(haContext.getState()).thenReturn(haState);
Mockito.when(haState.shouldPopulateReplQueues()).thenReturn(true);
Mockito.when(fsn.getHAContext()).thenReturn(haContext);
bm = new BlockManager(fsn, false, conf); bm = new BlockManager(fsn, false, conf);
bm.setInitializedReplQueues(true);
CacheManager cm = Mockito.mock(CacheManager.class);
Mockito.doReturn(cm).when(fsn).getCacheManager();
GSet<CachedBlock, CachedBlock> cb =
new LightWeightGSet<CachedBlock, CachedBlock>(1);
Mockito.when(cm.getCachedBlocks()).thenReturn(cb);
final String[] racks = { final String[] racks = {
"/rackA", "/rackA",
"/rackA", "/rackA",
@ -522,7 +541,7 @@ public class TestBlockManager {
} }
return ret; return ret;
} }
private List<DatanodeDescriptor> startDecommission(int ... indexes) { private List<DatanodeDescriptor> startDecommission(int ... indexes) {
List<DatanodeDescriptor> nodes = getNodes(indexes); List<DatanodeDescriptor> nodes = getNodes(indexes);
for (DatanodeDescriptor node : nodes) { for (DatanodeDescriptor node : nodes) {
@ -918,6 +937,42 @@ public class TestBlockManager {
return builder.build(); return builder.build();
} }
@Test
public void testUCBlockNotConsideredMissing() throws Exception {
DatanodeDescriptor node = nodes.get(0);
DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.setAlive(true);
DatanodeRegistration nodeReg =
new DatanodeRegistration(node, null, null, "");
// register new node
bm.getDatanodeManager().registerDatanode(nodeReg);
bm.getDatanodeManager().addDatanode(node);
// Build an incremental report
List<ReceivedDeletedBlockInfo> rdbiList = new ArrayList<>();
// blk_42 is under construction, finalizes on one node and is
// immediately deleted on same node
long blockId = 42; // arbitrary
BlockInfo receivedBlock = addUcBlockToBM(blockId);
rdbiList.add(new ReceivedDeletedBlockInfo(new Block(receivedBlock),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null));
rdbiList.add(new ReceivedDeletedBlockInfo(
new Block(blockId),
ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null));
// process IBR
StorageReceivedDeletedBlocks srdb =
new StorageReceivedDeletedBlocks(new DatanodeStorage(ds.getStorageID()),
rdbiList.toArray(new ReceivedDeletedBlockInfo[rdbiList.size()]));
bm.processIncrementalBlockReport(node, srdb);
// Needed replications should still be 0.
assertEquals("UC block was incorrectly added to needed Replications",
0, bm.neededReconstruction.size());
}
private BlockInfo addBlockToBM(long blkId) { private BlockInfo addBlockToBM(long blkId) {
Block block = new Block(blkId); Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3); BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
@ -1250,14 +1305,17 @@ public class TestBlockManager {
FileInputStream fstream = new FileInputStream(file); FileInputStream fstream = new FileInputStream(file);
DataInputStream in = new DataInputStream(fstream); DataInputStream in = new DataInputStream(fstream);
BufferedReader reader = new BufferedReader(new InputStreamReader(in)); BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String corruptBlocksLine;
Boolean foundIt = false;
try { try {
for(int i =0;i<6;i++) { while ((corruptBlocksLine = reader.readLine()) != null) {
reader.readLine(); if (corruptBlocksLine.compareTo("Corrupt Blocks:") == 0) {
foundIt = true;
break;
}
} }
String corruptBlocksLine = reader.readLine(); assertTrue("Unexpected text in metasave," +
assertEquals("Unexpected text in metasave," + "was expecting corrupt blocks section!", foundIt);
"was expecting corrupt blocks section!", 0,
corruptBlocksLine.compareTo("Corrupt Blocks:"));
corruptBlocksLine = reader.readLine(); corruptBlocksLine = reader.readLine();
String regex = "Block=[0-9]+\\tNode=.*\\tStorageID=.*StorageState.*" + String regex = "Block=[0-9]+\\tNode=.*\\tStorageID=.*StorageState.*" +
"TotalReplicas=.*Reason=GENSTAMP_MISMATCH"; "TotalReplicas=.*Reason=GENSTAMP_MISMATCH";