HDFS-10968. BlockManager#isInNewRack should consider decommissioning nodes. Contributed by Jing Zhao.
This commit is contained in:
parent
6a38d118d8
commit
4d106213c0
|
@ -1781,8 +1781,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
private boolean isInNewRack(DatanodeDescriptor[] srcs,
|
||||
DatanodeDescriptor target) {
|
||||
LOG.debug("check if target {} increases racks, srcs={}", target,
|
||||
Arrays.asList(srcs));
|
||||
for (DatanodeDescriptor src : srcs) {
|
||||
if (src.getNetworkLocation().equals(target.getNetworkLocation())) {
|
||||
if (!src.isDecommissionInProgress() &&
|
||||
src.getNetworkLocation().equals(target.getNetworkLocation())) {
|
||||
LOG.debug("the target {} is in the same rack with src {}", target, src);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,12 +35,14 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -58,57 +60,44 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
|
||||
}
|
||||
|
||||
private static final String[] hosts = getHosts();
|
||||
private static final String[] racks = getRacks();
|
||||
private static final String[] hosts =
|
||||
getHosts(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1);
|
||||
private static final String[] racks =
|
||||
getRacks(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1, NUM_DATA_BLOCKS);
|
||||
|
||||
private static String[] getHosts() {
|
||||
String[] hosts = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1];
|
||||
private static String[] getHosts(int numHosts) {
|
||||
String[] hosts = new String[numHosts];
|
||||
for (int i = 0; i < hosts.length; i++) {
|
||||
hosts[i] = "host" + (i + 1);
|
||||
}
|
||||
return hosts;
|
||||
}
|
||||
|
||||
private static String[] getRacks() {
|
||||
String[] racks = new String[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 1];
|
||||
int numHostEachRack = (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS - 1) /
|
||||
(NUM_DATA_BLOCKS - 1) + 1;
|
||||
private static String[] getRacks(int numHosts, int numRacks) {
|
||||
String[] racks = new String[numHosts];
|
||||
int numHostEachRack = numHosts / numRacks;
|
||||
int residue = numHosts % numRacks;
|
||||
int j = 0;
|
||||
// we have NUM_DATA_BLOCKS racks
|
||||
for (int i = 1; i <= NUM_DATA_BLOCKS; i++) {
|
||||
if (j == racks.length - 1) {
|
||||
assert i == NUM_DATA_BLOCKS;
|
||||
for (int i = 1; i <= numRacks; i++) {
|
||||
int limit = i <= residue ? numHostEachRack + 1 : numHostEachRack;
|
||||
for (int k = 0; k < limit; k++) {
|
||||
racks[j++] = "/r" + i;
|
||||
} else {
|
||||
for (int k = 0; k < numHostEachRack && j < racks.length - 1; k++) {
|
||||
racks[j++] = "/r" + i;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert j == numHosts;
|
||||
return racks;
|
||||
}
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private static final HdfsConfiguration conf = new HdfsConfiguration();
|
||||
private DistributedFileSystem fs;
|
||||
private FSNamesystem fsn;
|
||||
private BlockManager bm;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
final HdfsConfiguration conf = new HdfsConfiguration();
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
|
||||
false);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
|
||||
.numDataNodes(hosts.length).build();
|
||||
cluster.waitActive();
|
||||
|
||||
fsn = cluster.getNamesystem();
|
||||
bm = fsn.getBlockManager();
|
||||
|
||||
fs = cluster.getFileSystem();
|
||||
fs.setErasureCodingPolicy(new Path("/"), null);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -132,6 +121,15 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
return dnProp;
|
||||
}
|
||||
|
||||
private DataNode getDataNode(String host) {
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
if (dn.getDatanodeId().getHostName().equals(host)) {
|
||||
return dn;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* When there are all the internal blocks available but they are not placed on
|
||||
* enough racks, NameNode should avoid normal decoding reconstruction but copy
|
||||
|
@ -143,9 +141,19 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
*/
|
||||
@Test
|
||||
public void testReconstructForNotEnoughRacks() throws Exception {
|
||||
LOG.info("cluster hosts: {}, racks: {}", Arrays.asList(hosts),
|
||||
Arrays.asList(racks));
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
|
||||
.numDataNodes(hosts.length).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
fs.setErasureCodingPolicy(new Path("/"), null);
|
||||
FSNamesystem fsn = cluster.getNamesystem();
|
||||
BlockManager bm = fsn.getBlockManager();
|
||||
|
||||
MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
|
||||
hosts[hosts.length - 1]);
|
||||
|
||||
final Path file = new Path("/foo");
|
||||
// the file's block is in 9 dn but 5 racks
|
||||
DFSTestUtil.createFile(fs, file,
|
||||
|
@ -206,6 +214,12 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
|
||||
@Test
|
||||
public void testChooseExcessReplicasToDelete() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
|
||||
.numDataNodes(hosts.length).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
fs.setErasureCodingPolicy(new Path("/"), null);
|
||||
|
||||
MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
|
||||
hosts[hosts.length - 1]);
|
||||
|
||||
|
@ -242,4 +256,82 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
Assert.assertFalse(dn.getHostName().equals("host1"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In case we have 10 internal blocks on 5 racks, where 9 of blocks are live
|
||||
* and 1 decommissioning, make sure the reconstruction happens correctly.
|
||||
*/
|
||||
@Test
|
||||
public void testReconstructionWithDecommission() throws Exception {
|
||||
final String[] racks = getRacks(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2,
|
||||
NUM_DATA_BLOCKS);
|
||||
final String[] hosts = getHosts(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2);
|
||||
// we now have 11 hosts on 6 racks with distribution: 2-2-2-2-2-1
|
||||
cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
|
||||
.numDataNodes(hosts.length).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
fs.setErasureCodingPolicy(new Path("/"), null);
|
||||
|
||||
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||
final DatanodeManager dm = bm.getDatanodeManager();
|
||||
|
||||
// stop h9 and h10 and create a file with 6+3 internal blocks
|
||||
MiniDFSCluster.DataNodeProperties h9 = stopDataNode(hosts[hosts.length - 3]);
|
||||
MiniDFSCluster.DataNodeProperties h10 = stopDataNode(hosts[hosts.length - 2]);
|
||||
final Path file = new Path("/foo");
|
||||
DFSTestUtil.createFile(fs, file,
|
||||
BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
|
||||
final BlockInfo blockInfo = cluster.getNamesystem().getFSDirectory()
|
||||
.getINode(file.toString()).asFile().getLastBlock();
|
||||
|
||||
// bring h9 back
|
||||
cluster.restartDataNode(h9);
|
||||
cluster.waitActive();
|
||||
|
||||
// stop h11 so that the reconstruction happens
|
||||
MiniDFSCluster.DataNodeProperties h11 = stopDataNode(hosts[hosts.length - 1]);
|
||||
boolean recovered = bm.countNodes(blockInfo).liveReplicas() >=
|
||||
NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
|
||||
for (int i = 0; i < 10 & !recovered; i++) {
|
||||
Thread.sleep(1000);
|
||||
recovered = bm.countNodes(blockInfo).liveReplicas() >=
|
||||
NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
|
||||
}
|
||||
Assert.assertTrue(recovered);
|
||||
|
||||
// mark h9 as decommissioning
|
||||
DataNode datanode9 = getDataNode(hosts[hosts.length - 3]);
|
||||
Assert.assertNotNull(datanode9);
|
||||
final DatanodeDescriptor dn9 = dm.getDatanode(datanode9.getDatanodeId());
|
||||
dn9.startDecommission();
|
||||
|
||||
// restart h10 and h11
|
||||
cluster.restartDataNode(h10);
|
||||
cluster.restartDataNode(h11);
|
||||
cluster.waitActive();
|
||||
DataNodeTestUtils.triggerBlockReport(getDataNode(hosts[hosts.length - 1]));
|
||||
|
||||
// start decommissioning h9
|
||||
boolean satisfied = bm.isPlacementPolicySatisfied(blockInfo);
|
||||
Assert.assertFalse(satisfied);
|
||||
final DecommissionManager decomManager =
|
||||
(DecommissionManager) Whitebox.getInternalState(dm, "decomManager");
|
||||
cluster.getNamesystem().writeLock();
|
||||
try {
|
||||
dn9.stopDecommission();
|
||||
decomManager.startDecommission(dn9);
|
||||
} finally {
|
||||
cluster.getNamesystem().writeUnlock();
|
||||
}
|
||||
|
||||
// make sure the decommission finishes and the block in on 6 racks
|
||||
boolean decommissioned = dn9.isDecommissioned();
|
||||
for (int i = 0; i < 10 && !decommissioned; i++) {
|
||||
Thread.sleep(1000);
|
||||
decommissioned = dn9.isDecommissioned();
|
||||
}
|
||||
Assert.assertTrue(decommissioned);
|
||||
Assert.assertTrue(bm.isPlacementPolicySatisfied(blockInfo));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue