From 4d106213c0f4835b723c9a50bd8080a9017122d7 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Fri, 7 Oct 2016 22:44:54 -0700 Subject: [PATCH] HDFS-10968. BlockManager#isInNewRack should consider decommissioning nodes. Contributed by Jing Zhao. --- .../server/blockmanagement/BlockManager.java | 6 +- ...nstructStripedBlocksWithRackAwareness.java | 158 ++++++++++++++---- 2 files changed, 130 insertions(+), 34 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8b746094458..7949439b431 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1781,8 +1781,12 @@ public class BlockManager implements BlockStatsMXBean { private boolean isInNewRack(DatanodeDescriptor[] srcs, DatanodeDescriptor target) { + LOG.debug("check if target {} increases racks, srcs={}", target, + Arrays.asList(srcs)); for (DatanodeDescriptor src : srcs) { - if (src.getNetworkLocation().equals(target.getNetworkLocation())) { + if (!src.isDecommissionInProgress() && + src.getNetworkLocation().equals(target.getNetworkLocation())) { + LOG.debug("the target {} is in the same rack with src {}", target, src); return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java index 152e153174f..3bc13a8c36d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java @@ -35,12 +35,14 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -58,57 +60,44 @@ public class TestReconstructStripedBlocksWithRackAwareness { GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL); } - private static final String[] hosts = getHosts(); - private static final String[] racks = getRacks(); + private static final String[] hosts = + getHosts(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1); + private static final String[] racks = + getRacks(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1, NUM_DATA_BLOCKS); - private static String[] getHosts() { - String[] hosts = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1]; + private static String[] getHosts(int numHosts) { + String[] hosts = new String[numHosts]; for (int i = 0; i < hosts.length; i++) { hosts[i] = "host" + (i + 1); } return hosts; } - private static String[] getRacks() { - String[] racks = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1]; - int numHostEachRack = (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS - 1) / - (NUM_DATA_BLOCKS - 1) + 1; + private static String[] getRacks(int numHosts, int numRacks) { + String[] racks = new String[numHosts]; + int numHostEachRack = numHosts / numRacks; + int residue = numHosts % numRacks; int j = 0; - // we have NUM_DATA_BLOCKS racks - for (int i = 1; i <= NUM_DATA_BLOCKS; i++) { - if (j == racks.length - 1) { - assert i == NUM_DATA_BLOCKS; + for (int i = 1; i <= numRacks; i++) { + int limit = i <= residue ? numHostEachRack + 1 : numHostEachRack; + for (int k = 0; k < limit; k++) { racks[j++] = "/r" + i; - } else { - for (int k = 0; k < numHostEachRack && j < racks.length - 1; k++) { - racks[j++] = "/r" + i; - } } } + assert j == numHosts; return racks; } private MiniDFSCluster cluster; + private static final HdfsConfiguration conf = new HdfsConfiguration(); private DistributedFileSystem fs; - private FSNamesystem fsn; - private BlockManager bm; - @Before - public void setup() throws Exception { - final HdfsConfiguration conf = new HdfsConfiguration(); + @BeforeClass + public static void setup() throws Exception { conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); - - cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts) - .numDataNodes(hosts.length).build(); - cluster.waitActive(); - - fsn = cluster.getNamesystem(); - bm = fsn.getBlockManager(); - - fs = cluster.getFileSystem(); - fs.setErasureCodingPolicy(new Path("/"), null); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); } @After @@ -132,6 +121,15 @@ public class TestReconstructStripedBlocksWithRackAwareness { return dnProp; } + private DataNode getDataNode(String host) { + for (DataNode dn : cluster.getDataNodes()) { + if (dn.getDatanodeId().getHostName().equals(host)) { + return dn; + } + } + return null; + } + /** * When there are all the internal blocks available but they are not placed on * enough racks, NameNode should avoid normal decoding reconstruction but copy @@ -143,9 +141,19 @@ public class TestReconstructStripedBlocksWithRackAwareness { */ @Test public void testReconstructForNotEnoughRacks() throws Exception { + LOG.info("cluster hosts: {}, racks: {}", Arrays.asList(hosts), + Arrays.asList(racks)); + + cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts) + .numDataNodes(hosts.length).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + fs.setErasureCodingPolicy(new Path("/"), null); + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + MiniDFSCluster.DataNodeProperties lastHost = stopDataNode( hosts[hosts.length - 1]); - final Path file = new Path("/foo"); // the file's block is in 9 dn but 5 racks DFSTestUtil.createFile(fs, file, @@ -206,6 +214,12 @@ public class TestReconstructStripedBlocksWithRackAwareness { @Test public void testChooseExcessReplicasToDelete() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts) + .numDataNodes(hosts.length).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + fs.setErasureCodingPolicy(new Path("/"), null); + MiniDFSCluster.DataNodeProperties lastHost = stopDataNode( hosts[hosts.length - 1]); @@ -242,4 +256,82 @@ public class TestReconstructStripedBlocksWithRackAwareness { Assert.assertFalse(dn.getHostName().equals("host1")); } } + + /** + * In case we have 10 internal blocks on 5 racks, where 9 of blocks are live + * and 1 decommissioning, make sure the reconstruction happens correctly. + */ + @Test + public void testReconstructionWithDecommission() throws Exception { + final String[] racks = getRacks(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2, + NUM_DATA_BLOCKS); + final String[] hosts = getHosts(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2); + // we now have 11 hosts on 6 racks with distribution: 2-2-2-2-2-1 + cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts) + .numDataNodes(hosts.length).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + fs.setErasureCodingPolicy(new Path("/"), null); + + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + final DatanodeManager dm = bm.getDatanodeManager(); + + // stop h9 and h10 and create a file with 6+3 internal blocks + MiniDFSCluster.DataNodeProperties h9 = stopDataNode(hosts[hosts.length - 3]); + MiniDFSCluster.DataNodeProperties h10 = stopDataNode(hosts[hosts.length - 2]); + final Path file = new Path("/foo"); + DFSTestUtil.createFile(fs, file, + BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L); + final BlockInfo blockInfo = cluster.getNamesystem().getFSDirectory() + .getINode(file.toString()).asFile().getLastBlock(); + + // bring h9 back + cluster.restartDataNode(h9); + cluster.waitActive(); + + // stop h11 so that the reconstruction happens + MiniDFSCluster.DataNodeProperties h11 = stopDataNode(hosts[hosts.length - 1]); + boolean recovered = bm.countNodes(blockInfo).liveReplicas() >= + NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + for (int i = 0; i < 10 & !recovered; i++) { + Thread.sleep(1000); + recovered = bm.countNodes(blockInfo).liveReplicas() >= + NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + } + Assert.assertTrue(recovered); + + // mark h9 as decommissioning + DataNode datanode9 = getDataNode(hosts[hosts.length - 3]); + Assert.assertNotNull(datanode9); + final DatanodeDescriptor dn9 = dm.getDatanode(datanode9.getDatanodeId()); + dn9.startDecommission(); + + // restart h10 and h11 + cluster.restartDataNode(h10); + cluster.restartDataNode(h11); + cluster.waitActive(); + DataNodeTestUtils.triggerBlockReport(getDataNode(hosts[hosts.length - 1])); + + // start decommissioning h9 + boolean satisfied = bm.isPlacementPolicySatisfied(blockInfo); + Assert.assertFalse(satisfied); + final DecommissionManager decomManager = + (DecommissionManager) Whitebox.getInternalState(dm, "decomManager"); + cluster.getNamesystem().writeLock(); + try { + dn9.stopDecommission(); + decomManager.startDecommission(dn9); + } finally { + cluster.getNamesystem().writeUnlock(); + } + + // make sure the decommission finishes and the block in on 6 racks + boolean decommissioned = dn9.isDecommissioned(); + for (int i = 0; i < 10 && !decommissioned; i++) { + Thread.sleep(1000); + decommissioned = dn9.isDecommissioned(); + } + Assert.assertTrue(decommissioned); + Assert.assertTrue(bm.isPlacementPolicySatisfied(blockInfo)); + } }