HDFS-16456. EC: Decommission a rack with only on dn will fail when the rack number is equal with replication (#4126) (#4304)
(cherry picked from commit cee8c62498
)
Conflicts:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
(cherry picked from commit dd79aee635fdc61648e0c87bea1560dc35aee053)
Co-authored-by: caozhiqiang <lfxy@163.com>
Reviewed-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
e0732baeb8
commit
ba856bff95
|
@ -101,6 +101,13 @@ public class NetworkTopology {
|
|||
private int depthOfAllLeaves = -1;
|
||||
/** rack counter */
|
||||
protected int numOfRacks = 0;
|
||||
/** empty rack map, rackname->nodenumber. */
|
||||
private HashMap<String, Set<String>> rackMap =
|
||||
new HashMap<String, Set<String>>();
|
||||
/** decommission nodes, contained stoped nodes. */
|
||||
private HashSet<String> decommissionNodes = new HashSet<>();
|
||||
/** empty rack counter. */
|
||||
private int numOfEmptyRacks = 0;
|
||||
|
||||
/**
|
||||
* Whether or not this cluster has ever consisted of more than 1 rack,
|
||||
|
@ -150,6 +157,7 @@ public class NetworkTopology {
|
|||
if (rack == null) {
|
||||
incrementRacks();
|
||||
}
|
||||
interAddNodeWithEmptyRack(node);
|
||||
if (depthOfAllLeaves == -1) {
|
||||
depthOfAllLeaves = node.getLevel();
|
||||
}
|
||||
|
@ -226,6 +234,7 @@ public class NetworkTopology {
|
|||
if (rack == null) {
|
||||
numOfRacks--;
|
||||
}
|
||||
interRemoveNodeWithEmptyRack(node);
|
||||
}
|
||||
LOG.debug("NetworkTopology became:\n{}", this);
|
||||
} finally {
|
||||
|
@ -989,4 +998,108 @@ public class NetworkTopology {
|
|||
Preconditions.checkState(idx == activeLen,
|
||||
"Sorted the wrong number of nodes!");
|
||||
}
|
||||
|
||||
/** @return the number of nonempty racks */
|
||||
public int getNumOfNonEmptyRacks() {
|
||||
return numOfRacks - numOfEmptyRacks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update empty rack number when add a node like recommission.
|
||||
* @param node node to be added; can be null
|
||||
*/
|
||||
public void recommissionNode(Node node) {
|
||||
if (node == null) {
|
||||
return;
|
||||
}
|
||||
if (node instanceof InnerNode) {
|
||||
throw new IllegalArgumentException(
|
||||
"Not allow to remove an inner node: " + NodeBase.getPath(node));
|
||||
}
|
||||
netlock.writeLock().lock();
|
||||
try {
|
||||
decommissionNodes.remove(node.getName());
|
||||
interAddNodeWithEmptyRack(node);
|
||||
} finally {
|
||||
netlock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update empty rack number when remove a node like decommission.
|
||||
* @param node node to be added; can be null
|
||||
*/
|
||||
public void decommissionNode(Node node) {
|
||||
if (node == null) {
|
||||
return;
|
||||
}
|
||||
if (node instanceof InnerNode) {
|
||||
throw new IllegalArgumentException(
|
||||
"Not allow to remove an inner node: " + NodeBase.getPath(node));
|
||||
}
|
||||
netlock.writeLock().lock();
|
||||
try {
|
||||
decommissionNodes.add(node.getName());
|
||||
interRemoveNodeWithEmptyRack(node);
|
||||
} finally {
|
||||
netlock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal function for update empty rack number
|
||||
* for add or recommission a node.
|
||||
* @param node node to be added; can be null
|
||||
*/
|
||||
private void interAddNodeWithEmptyRack(Node node) {
|
||||
if (node == null) {
|
||||
return;
|
||||
}
|
||||
String rackname = node.getNetworkLocation();
|
||||
Set<String> nodes = rackMap.get(rackname);
|
||||
if (nodes == null) {
|
||||
nodes = new HashSet<String>();
|
||||
}
|
||||
if (!decommissionNodes.contains(node.getName())) {
|
||||
nodes.add(node.getName());
|
||||
}
|
||||
rackMap.put(rackname, nodes);
|
||||
countEmptyRacks();
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal function for update empty rack number
|
||||
* for remove or decommission a node.
|
||||
* @param node node to be removed; can be null
|
||||
*/
|
||||
private void interRemoveNodeWithEmptyRack(Node node) {
|
||||
if (node == null) {
|
||||
return;
|
||||
}
|
||||
String rackname = node.getNetworkLocation();
|
||||
Set<String> nodes = rackMap.get(rackname);
|
||||
if (nodes != null) {
|
||||
InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
|
||||
if (rack == null) {
|
||||
// this node and its rack are both removed.
|
||||
rackMap.remove(rackname);
|
||||
} else if (nodes.contains(node.getName())) {
|
||||
// this node is decommissioned or removed.
|
||||
nodes.remove(node.getName());
|
||||
rackMap.put(rackname, nodes);
|
||||
}
|
||||
countEmptyRacks();
|
||||
}
|
||||
}
|
||||
|
||||
private void countEmptyRacks() {
|
||||
int count = 0;
|
||||
for (Set<String> nodes : rackMap.values()) {
|
||||
if (nodes != null && nodes.isEmpty()) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
numOfEmptyRacks = count;
|
||||
LOG.debug("Current numOfEmptyRacks is {}", numOfEmptyRacks);
|
||||
}
|
||||
}
|
|
@ -304,7 +304,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
&& stats.isAvoidingStaleDataNodesForWrite());
|
||||
boolean avoidLocalRack = (addBlockFlags != null
|
||||
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_RACK) && writer != null
|
||||
&& clusterMap.getNumOfRacks() > 2);
|
||||
&& clusterMap.getNumOfNonEmptyRacks() > 2);
|
||||
boolean avoidLocalNode = (addBlockFlags != null
|
||||
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
|
||||
&& writer != null
|
||||
|
@ -385,7 +385,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
totalNumOfReplicas = clusterSize;
|
||||
}
|
||||
// No calculation needed when there is only one rack or picking one node.
|
||||
int numOfRacks = clusterMap.getNumOfRacks();
|
||||
int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
|
||||
// HDFS-14527 return default when numOfRacks = 0 to avoid
|
||||
// ArithmeticException when calc maxNodesPerRack at following logic.
|
||||
if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
|
||||
|
@ -1173,7 +1173,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
.map(dn -> dn.getNetworkLocation()).distinct().count();
|
||||
|
||||
return new BlockPlacementStatusDefault(Math.toIntExact(rackCount),
|
||||
minRacks, clusterMap.getNumOfRacks());
|
||||
minRacks, clusterMap.getNumOfNonEmptyRacks());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1370,4 +1370,3 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
return excludeSlowNodesEnabled;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
|||
totalNumOfReplicas = clusterSize;
|
||||
}
|
||||
// No calculation needed when there is only one rack or picking one node.
|
||||
int numOfRacks = clusterMap.getNumOfRacks();
|
||||
int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
|
||||
// HDFS-14527 return default when numOfRacks = 0 to avoid
|
||||
// ArithmeticException when calc maxNodesPerRack at following logic.
|
||||
if (numOfRacks <= 1 || totalNumOfReplicas <= 1) {
|
||||
|
@ -90,7 +90,9 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
|||
EnumMap<StorageType, Integer> storageTypes)
|
||||
throws NotEnoughReplicasException {
|
||||
int totalReplicaExpected = results.size() + numOfReplicas;
|
||||
int numOfRacks = clusterMap.getNumOfRacks();
|
||||
int numOfRacks = clusterMap.getNumOfNonEmptyRacks();
|
||||
|
||||
try {
|
||||
if (totalReplicaExpected < numOfRacks ||
|
||||
totalReplicaExpected % numOfRacks == 0) {
|
||||
writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
|
||||
|
@ -121,7 +123,6 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
|||
numOfReplicas = Math.min(totalReplicaExpected - results.size(),
|
||||
(maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
|
||||
|
||||
try {
|
||||
// Try to spread the replicas as evenly as possible across racks.
|
||||
// This is done by first placing with (maxNodesPerRack-1), then spreading
|
||||
// the remainder by calling again with maxNodesPerRack.
|
||||
|
@ -243,7 +244,7 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
|||
racks.add(dn.getNetworkLocation());
|
||||
}
|
||||
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
|
||||
clusterMap.getNumOfRacks());
|
||||
clusterMap.getNumOfNonEmptyRacks());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -176,6 +176,8 @@ public class DatanodeAdminManager {
|
|||
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
||||
// Update DN stats maintained by HeartbeatManager
|
||||
hbManager.startDecommission(node);
|
||||
// Update cluster's emptyRack
|
||||
blockManager.getDatanodeManager().getNetworkTopology().decommissionNode(node);
|
||||
// hbManager.startDecommission will set dead node to decommissioned.
|
||||
if (node.isDecommissionInProgress()) {
|
||||
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
||||
|
@ -200,6 +202,8 @@ public class DatanodeAdminManager {
|
|||
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
||||
// Update DN stats maintained by HeartbeatManager
|
||||
hbManager.stopDecommission(node);
|
||||
// Update cluster's emptyRack
|
||||
blockManager.getDatanodeManager().getNetworkTopology().recommissionNode(node);
|
||||
// extra redundancy blocks will be detected and processed when
|
||||
// the dead node comes back and send in its full block report.
|
||||
if (node.isAlive()) {
|
||||
|
|
|
@ -8233,7 +8233,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
getBlockManager().getDatanodeManager().getNumOfDataNodes();
|
||||
int numOfRacks =
|
||||
getBlockManager().getDatanodeManager().getNetworkTopology()
|
||||
.getNumOfRacks();
|
||||
.getNumOfNonEmptyRacks();
|
||||
result = ECTopologyVerifier
|
||||
.getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies);
|
||||
}
|
||||
|
@ -8676,7 +8676,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
int numOfDataNodes =
|
||||
getBlockManager().getDatanodeManager().getNumOfDataNodes();
|
||||
int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology()
|
||||
.getNumOfRacks();
|
||||
.getNumOfNonEmptyRacks();
|
||||
ErasureCodingPolicy[] enabledEcPolicies =
|
||||
getErasureCodingPolicyManager().getCopyOfEnabledPolicies();
|
||||
return ECTopologyVerifier
|
||||
|
@ -8734,4 +8734,3 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,24 +19,35 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.net.StaticMapping;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -167,6 +178,108 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify decommission a dn which is an only node in its rack.
|
||||
*/
|
||||
@Test
|
||||
public void testPlacementWithOnlyOneNodeInRackDecommission() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK4", "/RACK5", "/RACK2"};
|
||||
final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4", "/host5", "/host6"};
|
||||
|
||||
// enables DFSNetworkTopology
|
||||
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||
BlockPlacementPolicyRackFaultTolerant.class,
|
||||
BlockPlacementPolicy.class);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true);
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
||||
DEFAULT_BLOCK_SIZE / 2);
|
||||
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(7).racks(racks)
|
||||
.hosts(hosts).build();
|
||||
cluster.waitActive();
|
||||
nameNodeRpc = cluster.getNameNodeRpc();
|
||||
namesystem = cluster.getNamesystem();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
fs.enableErasureCodingPolicy("RS-3-2-1024k");
|
||||
fs.setErasureCodingPolicy(new Path("/"), "RS-3-2-1024k");
|
||||
|
||||
final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||
final DatanodeManager dm = bm.getDatanodeManager();
|
||||
assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology);
|
||||
|
||||
String clientMachine = "/host4";
|
||||
String clientRack = "/RACK4";
|
||||
String src = "/test";
|
||||
|
||||
final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
|
||||
DatanodeDescriptor dnd4 = dnm.getDatanode(cluster.getDataNodes().get(4).getDatanodeId());
|
||||
assertEquals(dnd4.getNetworkLocation(), clientRack);
|
||||
dnm.getDatanodeAdminManager().startDecommission(dnd4);
|
||||
short replication = 5;
|
||||
short additionalReplication = 1;
|
||||
|
||||
try {
|
||||
// Create the file with client machine
|
||||
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
|
||||
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
|
||||
replication, DEFAULT_BLOCK_SIZE * 1024 * 10, null, null, null, false);
|
||||
|
||||
//test chooseTarget for new file
|
||||
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
|
||||
null, null, fileStatus.getFileId(), null, null);
|
||||
HashMap<String, Integer> racksCount = new HashMap<String, Integer>();
|
||||
doTestLocatedBlockRacks(racksCount, replication, 4, locatedBlock);
|
||||
|
||||
//test chooseTarget for existing file.
|
||||
LocatedBlock additionalLocatedBlock =
|
||||
nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(),
|
||||
locatedBlock.getBlock(), locatedBlock.getLocations(),
|
||||
locatedBlock.getStorageIDs(), DatanodeInfo.EMPTY_ARRAY,
|
||||
additionalReplication, clientMachine);
|
||||
|
||||
racksCount.clear();
|
||||
doTestLocatedBlockRacks(racksCount, additionalReplication + replication,
|
||||
4, additionalLocatedBlock);
|
||||
assertEquals(racksCount.get("/RACK0"), (Integer)2);
|
||||
assertEquals(racksCount.get("/RACK2"), (Integer)2);
|
||||
} finally {
|
||||
dnm.getDatanodeAdminManager().stopDecommission(dnd4);
|
||||
}
|
||||
|
||||
//test if decommission succeeded
|
||||
DatanodeDescriptor dnd3 = dnm.getDatanode(cluster.getDataNodes().get(3).getDatanodeId());
|
||||
cluster.getNamesystem().writeLock();
|
||||
try {
|
||||
dm.getDatanodeAdminManager().startDecommission(dnd3);
|
||||
} finally {
|
||||
cluster.getNamesystem().writeUnlock();
|
||||
}
|
||||
|
||||
// make sure the decommission finishes and the block in on 4 racks
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return dnd3.isDecommissioned();
|
||||
}
|
||||
}, 1000, 10 * 1000);
|
||||
|
||||
LocatedBlocks locatedBlocks =
|
||||
cluster.getFileSystem().getClient().getLocatedBlocks(
|
||||
src, 0, DEFAULT_BLOCK_SIZE);
|
||||
assertEquals(4, bm.getDatanodeManager().
|
||||
getNetworkTopology().getNumOfNonEmptyRacks());
|
||||
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
|
||||
BlockPlacementStatus status = bm.getStriptedBlockPlacementPolicy()
|
||||
.verifyBlockPlacement(block.getLocations(), 5);
|
||||
Assert.assertTrue(status.isPlacementPolicySatisfied());
|
||||
}
|
||||
}
|
||||
|
||||
private void shuffle(DatanodeInfo[] locs, String[] storageIDs) {
|
||||
int length = locs.length;
|
||||
Object[][] pairs = new Object[length][];
|
||||
|
@ -198,6 +311,17 @@ public class TestBlockPlacementPolicyRackFaultTolerant {
|
|||
assertTrue(maxCount - minCount <= 1);
|
||||
}
|
||||
|
||||
private void doTestLocatedBlockRacks(HashMap<String, Integer> racksCount, int replication,
|
||||
int validracknum, LocatedBlock locatedBlock) {
|
||||
assertEquals(replication, locatedBlock.getLocations().length);
|
||||
|
||||
for (DatanodeInfo node :
|
||||
locatedBlock.getLocations()) {
|
||||
addToRacksCount(node.getNetworkLocation(), racksCount);
|
||||
}
|
||||
assertEquals(validracknum, racksCount.size());
|
||||
}
|
||||
|
||||
private void addToRacksCount(String rack, HashMap<String, Integer> racksCount) {
|
||||
Integer count = racksCount.get(rack);
|
||||
if (count == null) {
|
||||
|
|
|
@ -633,4 +633,20 @@ public class TestNetworkTopology {
|
|||
numNodes = cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes);
|
||||
assertEquals(12, numNodes);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddAndRemoveNodeWithEmptyRack() {
|
||||
DatanodeDescriptor n1 = DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3");
|
||||
DatanodeDescriptor n2 = DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3");
|
||||
DatanodeDescriptor n3 = DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3");
|
||||
|
||||
cluster.decommissionNode(n1);
|
||||
assertEquals(6, cluster.getNumOfNonEmptyRacks());
|
||||
cluster.decommissionNode(n2);
|
||||
cluster.decommissionNode(n3);
|
||||
assertEquals(5, cluster.getNumOfNonEmptyRacks());
|
||||
|
||||
cluster.recommissionNode(n1);
|
||||
assertEquals(6, cluster.getNumOfNonEmptyRacks());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue