From 97c2e576c91c2316c2b52bfc948bae9bff8ca49f Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Fri, 5 May 2017 11:54:50 +0800 Subject: [PATCH] HDFS-11530. Use HDFS specific network topology to choose datanode in BlockPlacementPolicyDefault. Contributed by Yiqun Lin and Chen Liang. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../hadoop/hdfs/net/DFSNetworkTopology.java | 24 ++- .../hadoop/hdfs/net/DFSTopologyNodeImpl.java | 137 ++++++++++++++++++ .../BlockPlacementPolicyDefault.java | 36 ++++- .../blockmanagement/DatanodeDescriptor.java | 36 ++++- .../blockmanagement/DatanodeManager.java | 19 ++- .../src/main/resources/hdfs-default.xml | 8 + .../TestDefaultBlockPlacementPolicy.java | 46 ++++++ 8 files changed, 302 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 0ca344c8def..b95c7e6d32c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1085,6 +1085,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "httpfs.buffer.size"; public static final int HTTP_BUFFER_SIZE_DEFAULT = 4096; + public static final String DFS_USE_DFS_NETWORK_TOPOLOGY_KEY = + "dfs.use.dfs.network.topology"; + public static final boolean DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT = false; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java index 259e2759dbd..e74cdecbd4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.net; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -204,10 +206,24 @@ public class DFSNetworkTopology extends NetworkTopology { } if (excludedNodes != null) { for (Node excludedNode : excludedNodes) { - // all excluded nodes should be DatanodeDescriptor - Preconditions.checkArgument(excludedNode instanceof DatanodeDescriptor); - availableCount -= ((DatanodeDescriptor) excludedNode) - .hasStorageType(type) ? 1 : 0; + if (excludedNode instanceof DatanodeDescriptor) { + availableCount -= ((DatanodeDescriptor) excludedNode) + .hasStorageType(type) ? 1 : 0; + } else if (excludedNode instanceof DFSTopologyNodeImpl) { + availableCount -= ((DFSTopologyNodeImpl) excludedNode) + .getSubtreeStorageCount(type); + } else if (excludedNode instanceof DatanodeInfo) { + // find out the corresponding DatanodeDescriptor object, beacuse + // we need to get its storage type info. + // could be expensive operation, fortunately the size of excluded + // nodes set is supposed to be very small. + String nodeLocation = excludedNode.getNetworkLocation() + + "/" + excludedNode.getName(); + DatanodeDescriptor dn = (DatanodeDescriptor)getNode(nodeLocation); + availableCount -= dn.hasStorageType(type)? 1 : 0; + } else { + LOG.error("Unexpected node type: {}.", excludedNode.getClass()); + } } } if (availableCount <= 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java index 6d80db5af89..002f4fcee04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hdfs.net; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.net.InnerNode; import org.apache.hadoop.net.InnerNodeImpl; import org.apache.hadoop.net.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.EnumMap; import java.util.EnumSet; @@ -36,6 +39,9 @@ import java.util.HashMap; */ public class DFSTopologyNodeImpl extends InnerNodeImpl { + public static final Logger LOG = + LoggerFactory.getLogger(DFSTopologyNodeImpl.class); + static final InnerNodeImpl.Factory FACTORY = new DFSTopologyNodeImpl.Factory(); @@ -127,8 +133,68 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl { } } + /** + * Called when add() is called to add a node that already exist. + * + * In normal execution, nodes are added only once and this should not happen. + * However if node restarts, we may run into the case where the same node + * tries to add itself again with potentially different storage type info. + * In this case this method will update the meta data according to the new + * storage info. + * + * Note that it is important to also update all the ancestors if we do have + * updated the local node storage info. + * + * @param dnDescriptor the node that is added another time, with potentially + * different storage types. + */ + private void updateExistingDatanode(DatanodeDescriptor dnDescriptor) { + if (childrenStorageInfo.containsKey(dnDescriptor.getName())) { + // all existing node should have an entry in childrenStorageInfo + boolean same = dnDescriptor.getStorageTypes().size() + == childrenStorageInfo.get(dnDescriptor.getName()).keySet().size(); + for (StorageType type : + childrenStorageInfo.get(dnDescriptor.getName()).keySet()) { + same = same && dnDescriptor.hasStorageType(type); + } + if (same) { + // if the storage type hasn't been changed, do nothing. + return; + } + // not same means we need to update the storage info. + DFSTopologyNodeImpl parent = (DFSTopologyNodeImpl)getParent(); + for (StorageType type : + childrenStorageInfo.get(dnDescriptor.getName()).keySet()) { + if (!dnDescriptor.hasStorageType(type)) { + // remove this type, because the new storage info does not have it. + // also need to remove decrement the count for all the ancestors. + // since this is the parent of n, where n is a datanode, + // the map must have 1 as the value of all keys + childrenStorageInfo.get(dnDescriptor.getName()).remove(type); + decStorageTypeCount(type); + if (parent != null) { + parent.childRemoveStorage(getName(), type); + } + } + } + for (StorageType type : dnDescriptor.getStorageTypes()) { + if (!childrenStorageInfo.get(dnDescriptor.getName()) + .containsKey(type)) { + // there is a new type in new storage info, add this locally, + // as well as all ancestors. + childrenStorageInfo.get(dnDescriptor.getName()).put(type, 1); + incStorageTypeCount(type); + if (parent != null) { + parent.childAddStorage(getName(), type); + } + } + } + } + } + @Override public boolean add(Node n) { + LOG.debug("adding node {}", n.getName()); if (!isAncestor(n)) { throw new IllegalArgumentException(n.getName() + ", which is located at " + n.getNetworkLocation() @@ -149,6 +215,7 @@ public class DFSTopologyNodeImpl extends InnerNodeImpl { for(int i=0; i typeCount = + childrenStorageInfo.get(childName); + if (typeCount.containsKey(type)) { + typeCount.put(type, typeCount.get(type) + 1); + } else { + // Please be aware that, the counts are always "number of datanodes in + // this subtree" rather than "number of storages in this storage". + // so if the caller is a datanode, it should always be this branch rather + // than the +1 branch above. This depends on the caller in + // DatanodeDescriptor to make sure only when a *new* storage type is added + // it calls this. (should not call this when a already existing storage + // is added). + // but no such restriction for inner nodes. + typeCount.put(type, 1); + } + if (storageTypeCounts.containsKey(type)) { + storageTypeCounts.put(type, storageTypeCounts.get(type) + 1); + } else { + storageTypeCounts.put(type, 1); + } + if (getParent() != null) { + ((DFSTopologyNodeImpl)getParent()).childAddStorage(getName(), type); + } + } + + /** + * Called by a child node of the current node to decrement a storage count. + * + * @param childName the name of the child removing a storage type. + * @param type the type being removed. + */ + public synchronized void childRemoveStorage( + String childName, StorageType type) { + LOG.debug("child remove storage: {}:{}", childName, type); + Preconditions.checkArgument(childrenStorageInfo.containsKey(childName)); + EnumMap typeCount = + childrenStorageInfo.get(childName); + Preconditions.checkArgument(typeCount.containsKey(type)); + if (typeCount.get(type) > 1) { + typeCount.put(type, typeCount.get(type) - 1); + } else { + typeCount.remove(type); + } + Preconditions.checkArgument(storageTypeCounts.containsKey(type)); + if (storageTypeCounts.get(type) > 1) { + storageTypeCounts.put(type, storageTypeCounts.get(type) - 1); + } else { + storageTypeCounts.remove(type); + } + if (getParent() != null) { + ((DFSTopologyNodeImpl)getParent()).childRemoveStorage(getName(), type); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 767633485e6..a245f0c0060 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -713,7 +714,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean badTarget = false; DatanodeStorageInfo firstChosen = null; while (numOfReplicas > 0) { - DatanodeDescriptor chosenNode = chooseDataNode(scope, excludedNodes); + // the storage type that current node has + StorageType includeType = null; + DatanodeDescriptor chosenNode = null; + if (clusterMap instanceof DFSNetworkTopology) { + for (StorageType type : storageTypes.keySet()) { + chosenNode = chooseDataNode(scope, excludedNodes, type); + + if (chosenNode != null) { + includeType = type; + break; + } + } + } else { + chosenNode = chooseDataNode(scope, excludedNodes); + } + if (chosenNode == null) { break; } @@ -729,6 +745,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { for (Iterator> iter = storageTypes .entrySet().iterator(); iter.hasNext();) { Map.Entry entry = iter.next(); + + // If there is one storage type the node has already contained, + // then no need to loop through other storage type. + if (includeType != null && entry.getKey() != includeType) { + continue; + } + storage = chooseStorage4Block( chosenNode, blocksize, results, entry.getKey()); if (storage != null) { @@ -781,6 +804,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNodes); } + /** + * Choose a datanode from the given scope with specified + * storage type. + * @return the chosen node, if there is any. + */ + protected DatanodeDescriptor chooseDataNode(final String scope, + final Collection excludedNodes, StorageType type) { + return (DatanodeDescriptor) ((DFSNetworkTopology) clusterMap) + .chooseRandomWithStorageTypeTwoTrial(scope, excludedNodes, type); + } + /** * Choose a good storage of given storage type from datanode, and add it to * the result list. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index d0583b3e589..4b87fd43534 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.net.DFSTopologyNodeImpl; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -494,7 +495,16 @@ public class DatanodeDescriptor extends DatanodeInfo { // blocks. for (final DatanodeStorageInfo storageInfo : excessStorages.values()) { if (storageInfo.numBlocks() == 0) { - storageMap.remove(storageInfo.getStorageID()); + DatanodeStorageInfo info = + storageMap.remove(storageInfo.getStorageID()); + if (!hasStorageType(info.getStorageType())) { + // we removed a storage, and as result there is no more such storage + // type, inform the parent about this. + if (getParent() instanceof DFSTopologyNodeImpl) { + ((DFSTopologyNodeImpl) getParent()).childRemoveStorage(getName(), + info.getStorageType()); + } + } LOG.info("Removed storage {} from DataNode {}", storageInfo, this); } else { // This can occur until all block reports are received. @@ -911,9 +921,20 @@ public class DatanodeDescriptor extends DatanodeInfo { DatanodeStorageInfo updateStorage(DatanodeStorage s) { synchronized (storageMap) { DatanodeStorageInfo storage = storageMap.get(s.getStorageID()); + DFSTopologyNodeImpl parent = null; + if (getParent() instanceof DFSTopologyNodeImpl) { + parent = (DFSTopologyNodeImpl) getParent(); + } + if (storage == null) { LOG.info("Adding new storage ID {} for DN {}", s.getStorageID(), getXferAddr()); + StorageType type = s.getStorageType(); + if (!hasStorageType(type) && parent != null) { + // we are about to add a type this node currently does not have, + // inform the parent that a new type is added to this datanode + parent.childAddStorage(getName(), s.getStorageType()); + } storage = new DatanodeStorageInfo(this, s); storageMap.put(s.getStorageID(), storage); } else if (storage.getState() != s.getState() || @@ -921,8 +942,21 @@ public class DatanodeDescriptor extends DatanodeInfo { // For backwards compatibility, make sure that the type and // state are updated. Some reports from older datanodes do // not include these fields so we may have assumed defaults. + StorageType oldType = storage.getStorageType(); + StorageType newType = s.getStorageType(); + if (oldType != newType && !hasStorageType(newType) && parent != null) { + // we are about to add a type this node currently does not have + // inform the parent that a new type is added to this datanode + // if old == new, nothing's changed. don't bother + parent.childAddStorage(getName(), newType); + } storage.updateFromStorage(s); storageMap.put(storage.getStorageID(), storage); + if (oldType != newType && !hasStorageType(oldType) && parent != null) { + // there is no more old type storage on this datanode, inform parent + // about this change. + parent.childRemoveStorage(getName(), oldType); + } } return storage; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index a61aa78c283..7dcc9fd276c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -186,6 +187,11 @@ public class DatanodeManager { */ private final boolean dataNodeDiskStatsEnabled; + /** + * If we use DfsNetworkTopology to choose nodes for placing replicas. + */ + private final boolean useDfsNetworkTopology; + @Nullable private final SlowPeerTracker slowPeerTracker; @Nullable @@ -205,8 +211,17 @@ public class DatanodeManager { final Configuration conf) throws IOException { this.namesystem = namesystem; this.blockManager = blockManager; - - networktopology = NetworkTopology.getInstance(conf); + + // TODO: Enables DFSNetworkTopology by default after more stress + // testings/validations. + this.useDfsNetworkTopology = conf.getBoolean( + DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, + DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_DEFAULT); + if (useDfsNetworkTopology) { + networktopology = DFSNetworkTopology.getInstance(conf); + } else { + networktopology = NetworkTopology.getInstance(conf); + } this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf); this.decomManager = new DecommissionManager(namesystem, blockManager, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 0f33b709af8..f0f2220f759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4505,4 +4505,12 @@ + + dfs.use.dfs.network.topology + false + + Enables DFSNetworkTopology to choose nodes for placing replicas. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index 0931ff44ade..eab1199d976 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -97,6 +98,51 @@ public class TestDefaultBlockPlacementPolicy { testPlacement(clientMachine, "/RACK3", true); } + /** + * Verify local node selection with using DFSNetworkTopology. + */ + @Test + public void testPlacementWithDFSNetworkTopology() throws Exception { + Configuration conf = new HdfsConfiguration(); + final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK2"}; + final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4"}; + + // enables DFSNetworkTopology + conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, + DEFAULT_BLOCK_SIZE / 2); + + if (cluster != null) { + cluster.shutdown(); + } + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).racks(racks) + .hosts(hosts).build(); + cluster.waitActive(); + nameNodeRpc = cluster.getNameNodeRpc(); + namesystem = cluster.getNamesystem(); + + DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager(); + assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology); + + String clientMachine = "/host3"; + String clientRack = "/RACK3"; + String src = "/test"; + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, clientMachine, + clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, + DEFAULT_BLOCK_SIZE, null, null, false); + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, null, + null, fileStatus.getFileId(), null, null); + + assertEquals("Block should be allocated sufficient locations", + REPLICATION_FACTOR, locatedBlock.getLocations().length); + assertEquals("First datanode should be rack local", clientRack, + locatedBlock.getLocations()[0].getNetworkLocation()); + nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), + src, clientMachine); + } + /** * Verify decommissioned nodes should not be selected. */