HDFS-11530. Use HDFS specific network topology to choose datanode in BlockPlacementPolicyDefault. Contributed by Yiqun Lin and Chen Liang.

This commit is contained in:
Yiqun Lin 2017-05-05 11:54:50 +08:00
parent 3082552b3b
commit 97c2e576c9
8 changed files with 302 additions and 8 deletions

View File

@ -1085,6 +1085,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"httpfs.buffer.size";
public static final int HTTP_BUFFER_SIZE_DEFAULT = 4096;
public static final String DFS_USE_DFS_NETWORK_TOPOLOGY_KEY =
"dfs.use.dfs.network.topology";
public static final boolean DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT = false;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.net;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@ -204,10 +206,24 @@ public class DFSNetworkTopology extends NetworkTopology {
}
if (excludedNodes != null) {
for (Node excludedNode : excludedNodes) {
// all excluded nodes should be DatanodeDescriptor
Preconditions.checkArgument(excludedNode instanceof DatanodeDescriptor);
if (excludedNode instanceof DatanodeDescriptor) {
availableCount -= ((DatanodeDescriptor) excludedNode)
.hasStorageType(type) ? 1 : 0;
} else if (excludedNode instanceof DFSTopologyNodeImpl) {
availableCount -= ((DFSTopologyNodeImpl) excludedNode)
.getSubtreeStorageCount(type);
} else if (excludedNode instanceof DatanodeInfo) {
// find out the corresponding DatanodeDescriptor object, beacuse
// we need to get its storage type info.
// could be expensive operation, fortunately the size of excluded
// nodes set is supposed to be very small.
String nodeLocation = excludedNode.getNetworkLocation()
+ "/" + excludedNode.getName();
DatanodeDescriptor dn = (DatanodeDescriptor)getNode(nodeLocation);
availableCount -= dn.hasStorageType(type)? 1 : 0;
} else {
LOG.error("Unexpected node type: {}.", excludedNode.getClass());
}
}
}
if (availableCount <= 0) {

View File

@ -18,11 +18,14 @@
package org.apache.hadoop.hdfs.net;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.net.InnerNode;
import org.apache.hadoop.net.InnerNodeImpl;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.EnumMap;
import java.util.EnumSet;
@ -36,6 +39,9 @@ import java.util.HashMap;
*/
public class DFSTopologyNodeImpl extends InnerNodeImpl {
public static final Logger LOG =
LoggerFactory.getLogger(DFSTopologyNodeImpl.class);
static final InnerNodeImpl.Factory FACTORY
= new DFSTopologyNodeImpl.Factory();
@ -127,8 +133,68 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
}
}
/**
* Called when add() is called to add a node that already exist.
*
* In normal execution, nodes are added only once and this should not happen.
* However if node restarts, we may run into the case where the same node
* tries to add itself again with potentially different storage type info.
* In this case this method will update the meta data according to the new
* storage info.
*
* Note that it is important to also update all the ancestors if we do have
* updated the local node storage info.
*
* @param dnDescriptor the node that is added another time, with potentially
* different storage types.
*/
private void updateExistingDatanode(DatanodeDescriptor dnDescriptor) {
if (childrenStorageInfo.containsKey(dnDescriptor.getName())) {
// all existing node should have an entry in childrenStorageInfo
boolean same = dnDescriptor.getStorageTypes().size()
== childrenStorageInfo.get(dnDescriptor.getName()).keySet().size();
for (StorageType type :
childrenStorageInfo.get(dnDescriptor.getName()).keySet()) {
same = same && dnDescriptor.hasStorageType(type);
}
if (same) {
// if the storage type hasn't been changed, do nothing.
return;
}
// not same means we need to update the storage info.
DFSTopologyNodeImpl parent = (DFSTopologyNodeImpl)getParent();
for (StorageType type :
childrenStorageInfo.get(dnDescriptor.getName()).keySet()) {
if (!dnDescriptor.hasStorageType(type)) {
// remove this type, because the new storage info does not have it.
// also need to remove decrement the count for all the ancestors.
// since this is the parent of n, where n is a datanode,
// the map must have 1 as the value of all keys
childrenStorageInfo.get(dnDescriptor.getName()).remove(type);
decStorageTypeCount(type);
if (parent != null) {
parent.childRemoveStorage(getName(), type);
}
}
}
for (StorageType type : dnDescriptor.getStorageTypes()) {
if (!childrenStorageInfo.get(dnDescriptor.getName())
.containsKey(type)) {
// there is a new type in new storage info, add this locally,
// as well as all ancestors.
childrenStorageInfo.get(dnDescriptor.getName()).put(type, 1);
incStorageTypeCount(type);
if (parent != null) {
parent.childAddStorage(getName(), type);
}
}
}
}
}
@Override
public boolean add(Node n) {
LOG.debug("adding node {}", n.getName());
if (!isAncestor(n)) {
throw new IllegalArgumentException(n.getName()
+ ", which is located at " + n.getNetworkLocation()
@ -149,6 +215,7 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
for(int i=0; i<children.size(); i++) {
if (children.get(i).getName().equals(n.getName())) {
children.set(i, n);
updateExistingDatanode(dnDescriptor);
return false;
}
}
@ -227,6 +294,7 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
@Override
public boolean remove(Node n) {
LOG.debug("removing node {}", n.getName());
if (!isAncestor(n)) {
throw new IllegalArgumentException(n.getName()
+ ", which is located at " + n.getNetworkLocation()
@ -299,4 +367,73 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl {
return isRemoved;
}
}
/**
* Called by a child node of the current node to increment a storage count.
*
* lock is needed as different datanodes may call recursively to modify
* the same parent.
* TODO : this may not happen at all, depending on how heartheat is processed
* @param childName the name of the child that tries to add the storage type
* @param type the type being incremented.
*/
public synchronized void childAddStorage(
String childName, StorageType type) {
LOG.debug("child add storage: {}:{}", childName, type);
// childrenStorageInfo should definitely contain this node already
// because updateStorage is called after node added
Preconditions.checkArgument(childrenStorageInfo.containsKey(childName));
EnumMap<StorageType, Integer> typeCount =
childrenStorageInfo.get(childName);
if (typeCount.containsKey(type)) {
typeCount.put(type, typeCount.get(type) + 1);
} else {
// Please be aware that, the counts are always "number of datanodes in
// this subtree" rather than "number of storages in this storage".
// so if the caller is a datanode, it should always be this branch rather
// than the +1 branch above. This depends on the caller in
// DatanodeDescriptor to make sure only when a *new* storage type is added
// it calls this. (should not call this when a already existing storage
// is added).
// but no such restriction for inner nodes.
typeCount.put(type, 1);
}
if (storageTypeCounts.containsKey(type)) {
storageTypeCounts.put(type, storageTypeCounts.get(type) + 1);
} else {
storageTypeCounts.put(type, 1);
}
if (getParent() != null) {
((DFSTopologyNodeImpl)getParent()).childAddStorage(getName(), type);
}
}
/**
* Called by a child node of the current node to decrement a storage count.
*
* @param childName the name of the child removing a storage type.
* @param type the type being removed.
*/
public synchronized void childRemoveStorage(
String childName, StorageType type) {
LOG.debug("child remove storage: {}:{}", childName, type);
Preconditions.checkArgument(childrenStorageInfo.containsKey(childName));
EnumMap<StorageType, Integer> typeCount =
childrenStorageInfo.get(childName);
Preconditions.checkArgument(typeCount.containsKey(type));
if (typeCount.get(type) > 1) {
typeCount.put(type, typeCount.get(type) - 1);
} else {
typeCount.remove(type);
}
Preconditions.checkArgument(storageTypeCounts.containsKey(type));
if (storageTypeCounts.get(type) > 1) {
storageTypeCounts.put(type, storageTypeCounts.get(type) - 1);
} else {
storageTypeCounts.remove(type);
}
if (getParent() != null) {
((DFSTopologyNodeImpl)getParent()).childRemoveStorage(getName(), type);
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@ -713,7 +714,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
boolean badTarget = false;
DatanodeStorageInfo firstChosen = null;
while (numOfReplicas > 0) {
DatanodeDescriptor chosenNode = chooseDataNode(scope, excludedNodes);
// the storage type that current node has
StorageType includeType = null;
DatanodeDescriptor chosenNode = null;
if (clusterMap instanceof DFSNetworkTopology) {
for (StorageType type : storageTypes.keySet()) {
chosenNode = chooseDataNode(scope, excludedNodes, type);
if (chosenNode != null) {
includeType = type;
break;
}
}
} else {
chosenNode = chooseDataNode(scope, excludedNodes);
}
if (chosenNode == null) {
break;
}
@ -729,6 +745,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<StorageType, Integer> entry = iter.next();
// If there is one storage type the node has already contained,
// then no need to loop through other storage type.
if (includeType != null && entry.getKey() != includeType) {
continue;
}
storage = chooseStorage4Block(
chosenNode, blocksize, results, entry.getKey());
if (storage != null) {
@ -781,6 +804,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNodes);
}
/**
* Choose a datanode from the given <i>scope</i> with specified
* storage type.
* @return the chosen node, if there is any.
*/
protected DatanodeDescriptor chooseDataNode(final String scope,
final Collection<Node> excludedNodes, StorageType type) {
return (DatanodeDescriptor) ((DFSNetworkTopology) clusterMap)
.chooseRandomWithStorageTypeTwoTrial(scope, excludedNodes, type);
}
/**
* Choose a good storage of given storage type from datanode, and add it to
* the result list.

View File

@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.net.DFSTopologyNodeImpl;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -494,7 +495,16 @@ public class DatanodeDescriptor extends DatanodeInfo {
// blocks.
for (final DatanodeStorageInfo storageInfo : excessStorages.values()) {
if (storageInfo.numBlocks() == 0) {
DatanodeStorageInfo info =
storageMap.remove(storageInfo.getStorageID());
if (!hasStorageType(info.getStorageType())) {
// we removed a storage, and as result there is no more such storage
// type, inform the parent about this.
if (getParent() instanceof DFSTopologyNodeImpl) {
((DFSTopologyNodeImpl) getParent()).childRemoveStorage(getName(),
info.getStorageType());
}
}
LOG.info("Removed storage {} from DataNode {}", storageInfo, this);
} else {
// This can occur until all block reports are received.
@ -911,9 +921,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
DatanodeStorageInfo updateStorage(DatanodeStorage s) {
synchronized (storageMap) {
DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
DFSTopologyNodeImpl parent = null;
if (getParent() instanceof DFSTopologyNodeImpl) {
parent = (DFSTopologyNodeImpl) getParent();
}
if (storage == null) {
LOG.info("Adding new storage ID {} for DN {}", s.getStorageID(),
getXferAddr());
StorageType type = s.getStorageType();
if (!hasStorageType(type) && parent != null) {
// we are about to add a type this node currently does not have,
// inform the parent that a new type is added to this datanode
parent.childAddStorage(getName(), s.getStorageType());
}
storage = new DatanodeStorageInfo(this, s);
storageMap.put(s.getStorageID(), storage);
} else if (storage.getState() != s.getState() ||
@ -921,8 +942,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
// For backwards compatibility, make sure that the type and
// state are updated. Some reports from older datanodes do
// not include these fields so we may have assumed defaults.
StorageType oldType = storage.getStorageType();
StorageType newType = s.getStorageType();
if (oldType != newType && !hasStorageType(newType) && parent != null) {
// we are about to add a type this node currently does not have
// inform the parent that a new type is added to this datanode
// if old == new, nothing's changed. don't bother
parent.childAddStorage(getName(), newType);
}
storage.updateFromStorage(s);
storageMap.put(storage.getStorageID(), storage);
if (oldType != newType && !hasStorageType(oldType) && parent != null) {
// there is no more old type storage on this datanode, inform parent
// about this change.
parent.childRemoveStorage(getName(), oldType);
}
}
return storage;
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -186,6 +187,11 @@ public class DatanodeManager {
*/
private final boolean dataNodeDiskStatsEnabled;
/**
* If we use DfsNetworkTopology to choose nodes for placing replicas.
*/
private final boolean useDfsNetworkTopology;
@Nullable
private final SlowPeerTracker slowPeerTracker;
@Nullable
@ -206,7 +212,16 @@ public class DatanodeManager {
this.namesystem = namesystem;
this.blockManager = blockManager;
// TODO: Enables DFSNetworkTopology by default after more stress
// testings/validations.
this.useDfsNetworkTopology = conf.getBoolean(
DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY,
DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT);
if (useDfsNetworkTopology) {
networktopology = DFSNetworkTopology.getInstance(conf);
} else {
networktopology = NetworkTopology.getInstance(conf);
}
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.decomManager = new DecommissionManager(namesystem, blockManager,

View File

@ -4505,4 +4505,12 @@
</description>
</property>
<property>
<name>dfs.use.dfs.network.topology</name>
<value>false</value>
<description>
Enables DFSNetworkTopology to choose nodes for placing replicas.
</description>
</property>
</configuration>

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -97,6 +98,51 @@ public class TestDefaultBlockPlacementPolicy {
testPlacement(clientMachine, "/RACK3", true);
}
/**
* Verify local node selection with using DFSNetworkTopology.
*/
@Test
public void testPlacementWithDFSNetworkTopology() throws Exception {
Configuration conf = new HdfsConfiguration();
final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK2"};
final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4"};
// enables DFSNetworkTopology
conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DEFAULT_BLOCK_SIZE / 2);
if (cluster != null) {
cluster.shutdown();
}
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).racks(racks)
.hosts(hosts).build();
cluster.waitActive();
nameNodeRpc = cluster.getNameNodeRpc();
namesystem = cluster.getNamesystem();
DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology);
String clientMachine = "/host3";
String clientRack = "/RACK3";
String src = "/test";
// Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine,
clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR,
DEFAULT_BLOCK_SIZE, null, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null,
null, fileStatus.getFileId(), null, null);
assertEquals("Block should be allocated sufficient locations",
REPLICATION_FACTOR, locatedBlock.getLocations().length);
assertEquals("First datanode should be rack local", clientRack,
locatedBlock.getLocations()[0].getNetworkLocation());
nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
src, clientMachine);
}
/**
* Verify decommissioned nodes should not be selected.
*/