HDFS-8404. Pending block replication can get stuck using older genstamp. Contributed by Nathan Roberts.

(cherry picked from commit 8860e352c3)

(cherry picked from commit 536b9ee6d6)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java

(cherry picked from commit 2d5e60fa12a62463cd54f1b6b0fcb2ccdbd82c42)
This commit is contained in:
Kihwal Lee 2015-05-19 13:06:48 -05:00 committed by Vinod Kumar Vavilapalli
parent b3ca4dfaff
commit fb096b97cc
3 changed files with 108 additions and 10 deletions

View File

@ -141,6 +141,9 @@ Release 2.6.1 - UNRELEASED
HDFS-8254. Standby namenode doesn't process DELETED_BLOCK if the add block HDFS-8254. Standby namenode doesn't process DELETED_BLOCK if the add block
request is in edit log. (Rushabh S Shah via kihwal) request is in edit log. (Rushabh S Shah via kihwal)
HDFS-8404. Pending block replication can get stuck using older genstamp
(Nathan Roberts via kihwal)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1695,13 +1695,18 @@ public class BlockManager {
namesystem.writeLock(); namesystem.writeLock();
try { try {
for (int i = 0; i < timedOutItems.length; i++) { for (int i = 0; i < timedOutItems.length; i++) {
/*
* Use the blockinfo from the blocksmap to be certain we're working
* with the most up-to-date block information (e.g. genstamp).
*/
BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);
if (bi == null) {
continue;
}
NumberReplicas num = countNodes(timedOutItems[i]); NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]), if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) {
num.liveReplicas())) { neededReplications.add(bi, num.liveReplicas(),
neededReplications.add(timedOutItems[i], num.decommissionedReplicas(), getReplication(bi));
num.liveReplicas(),
num.decommissionedReplicas(),
getReplication(timedOutItems[i]));
} }
} }
} finally { } finally {

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* This class tests the internals of PendingReplicationBlocks.java, * This class tests the internals of PendingReplicationBlocks.java,
@ -52,13 +53,11 @@ public class TestPendingReplication {
private static final int DFS_REPLICATION_INTERVAL = 1; private static final int DFS_REPLICATION_INTERVAL = 1;
// Number of datanodes in the cluster // Number of datanodes in the cluster
private static final int DATANODE_COUNT = 5; private static final int DATANODE_COUNT = 5;
@Test @Test
public void testPendingReplication() { public void testPendingReplication() {
PendingReplicationBlocks pendingReplications; PendingReplicationBlocks pendingReplications;
pendingReplications = new PendingReplicationBlocks(TIMEOUT * 1000); pendingReplications = new PendingReplicationBlocks(TIMEOUT * 1000);
pendingReplications.start(); pendingReplications.start();
// //
// Add 10 blocks to pendingReplications. // Add 10 blocks to pendingReplications.
// //
@ -140,8 +139,7 @@ public class TestPendingReplication {
// //
// Verify that everything has timed out. // Verify that everything has timed out.
// //
assertEquals("Size of pendingReplications ", assertEquals("Size of pendingReplications ", 0, pendingReplications.size());
0, pendingReplications.size());
Block[] timedOut = pendingReplications.getTimedOutBlocks(); Block[] timedOut = pendingReplications.getTimedOutBlocks();
assertTrue(timedOut != null && timedOut.length == 15); assertTrue(timedOut != null && timedOut.length == 15);
for (int i = 0; i < timedOut.length; i++) { for (int i = 0; i < timedOut.length; i++) {
@ -150,6 +148,98 @@ public class TestPendingReplication {
pendingReplications.stop(); pendingReplications.stop();
} }
/* Test that processPendingReplications will use the most recent
* blockinfo from the blocksmap by placing a larger genstamp into
* the blocksmap.
*/
@Test
public void testProcessPendingReplications() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
MiniDFSCluster cluster = null;
Block block;
BlockInfo blockInfo;
try {
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
cluster.waitActive();
FSNamesystem fsn = cluster.getNamesystem();
BlockManager blkManager = fsn.getBlockManager();
PendingReplicationBlocks pendingReplications =
blkManager.pendingReplications;
UnderReplicatedBlocks neededReplications = blkManager.neededReplications;
BlocksMap blocksMap = blkManager.blocksMap;
//
// Add 1 block to pendingReplications with GenerationStamp = 0.
//
block = new Block(1, 1, 0);
blockInfo = new BlockInfo(block, (short) 3);
pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1)));
BlockCollection bc = Mockito.mock(BlockCollection.class);
Mockito.doReturn((short) 3).when(bc).getBlockReplication();
// Place into blocksmap with GenerationStamp = 1
blockInfo.setGenerationStamp(1);
blocksMap.addBlockCollection(blockInfo, bc);
assertEquals("Size of pendingReplications ", 1,
pendingReplications.size());
// Add a second block to pendingReplications that has no
// corresponding entry in blocksmap
block = new Block(2, 2, 0);
pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1)));
// verify 2 blocks in pendingReplications
assertEquals("Size of pendingReplications ", 2,
pendingReplications.size());
//
// Wait for everything to timeout.
//
while (pendingReplications.size() > 0) {
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
//
// Verify that block moves to neededReplications
//
while (neededReplications.size() == 0) {
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
// Verify that the generation stamp we will try to replicate
// is now 1
for (Block b: neededReplications) {
assertEquals("Generation stamp is 1 ", 1,
b.getGenerationStamp());
}
// Verify size of neededReplications is exactly 1.
assertEquals("size of neededReplications is 1 ", 1,
neededReplications.size());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/** /**
* Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the * Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the
* pending replications. Also make sure the blockReceivedAndDeleted call is * pending replications. Also make sure the blockReceivedAndDeleted call is