From 75955e7c50c175ba31c6f3ed9cc2c46b12700cd6 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sat, 3 May 2014 11:04:35 +0000 Subject: [PATCH] svn merge -c 1592179 from trunk for HDFS-5168. Add cross node dependency support to BlockPlacementPolicy. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1592181 13f79535-47bb-0310-9956-ffa450edef68 --- .../fs/CommonConfigurationKeysPublic.java | 2 + .../net/DNSToSwitchMappingWithDependency.java | 56 ++++++ .../apache/hadoop/net/ScriptBasedMapping.java | 22 ++- .../net/ScriptBasedMappingWithDependency.java | 178 ++++++++++++++++++ .../TestScriptBasedMappingWithDependency.java | 86 +++++++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/protocol/DatanodeInfo.java | 19 ++ .../hadoop/hdfs/server/balancer/Balancer.java | 2 +- .../server/blockmanagement/BlockManager.java | 3 +- .../blockmanagement/BlockPlacementPolicy.java | 8 +- .../BlockPlacementPolicyDefault.java | 10 +- .../BlockPlacementPolicyWithNodeGroup.java | 39 +++- .../blockmanagement/DatanodeManager.java | 62 +++++- .../server/blockmanagement/Host2NodesMap.java | 40 +++- .../hdfs/server/namenode/NamenodeFsck.java | 6 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 41 +++- .../blockmanagement/BlockManagerTestUtil.java | 7 +- .../TestReplicationPolicyWithNodeGroup.java | 90 ++++++++- .../hadoop/hdfs/server/namenode/TestFsck.java | 7 + 19 files changed, 644 insertions(+), 37 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMappingWithDependency.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMappingWithDependency.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestScriptBasedMappingWithDependency.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 53e06b7b0f2..f0ae6d7a489 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -78,6 +78,8 @@ public class CommonConfigurationKeysPublic { /** See core-default.xml */ public static final String NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY = "net.topology.table.file.name"; + public static final String NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY = + "net.topology.dependency.script.file.name"; /** See core-default.xml */ public static final String FS_TRASH_CHECKPOINT_INTERVAL_KEY = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMappingWithDependency.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMappingWithDependency.java new file mode 100644 index 00000000000..24ae1faaecc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMappingWithDependency.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An interface that must be implemented to allow pluggable + * DNS-name/IP-address to RackID resolvers. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +public interface DNSToSwitchMappingWithDependency extends DNSToSwitchMapping { + /** + * Get a list of dependent DNS-names for a given DNS-name/IP-address. + * Dependent DNS-names fall into the same fault domain which must be + * taken into account when placing replicas. This is intended to be used for + * cross node group dependencies when node groups are not sufficient to + * distinguish data nodes by fault domains. In practice, this is needed when + * a compute server runs VMs which use shared storage (as opposite to + * directly attached storage). In this case data nodes fall in two different + * fault domains. One fault domain is defined by a compute server and + * the other is defined by storage. With node groups we can group data nodes + * either by server fault domain or by storage fault domain. However one of + * the fault domains cannot be handled and there we need to define cross node + * group dependencies. These dependencies are applied in block placement + * polices which ensure that no two replicas will be on two dependent nodes. + * @param name - host name or IP address of a data node. Input host name + * parameter must take a value of dfs.datanode.hostname config value if this + * config property is set. Otherwise FQDN of the data node is used. + * @return list of dependent host names. If dfs.datanode.hostname config + * property is set, then its value must be returned. + * Otherwise, FQDN is returned. + */ + public List getDependency(String name); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java index 2d02e133618..3dcb61090da 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java @@ -45,7 +45,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { +public class ScriptBasedMapping extends CachedDNSToSwitchMapping { /** * Minimum number of arguments: {@value} @@ -63,6 +63,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { */ static final String SCRIPT_FILENAME_KEY = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY ; + /** * key to the argument count that the script supports * {@value} @@ -84,7 +85,15 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { * */ public ScriptBasedMapping() { - super(new RawScriptBasedMapping()); + this(new RawScriptBasedMapping()); + } + + /** + * Create an instance from the given raw mapping + * @param rawMap raw DNSTOSwithMapping + */ + public ScriptBasedMapping(DNSToSwitchMapping rawMap) { + super(rawMap); } /** @@ -132,7 +141,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { * This is the uncached script mapping that is fed into the cache managed * by the superclass {@link CachedDNSToSwitchMapping} */ - private static final class RawScriptBasedMapping + protected static class RawScriptBasedMapping extends AbstractDNSToSwitchMapping { private String scriptName; private int maxArgs; //max hostnames per call of the script @@ -176,7 +185,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { return m; } - String output = runResolveCommand(names); + String output = runResolveCommand(names, scriptName); if (output != null) { StringTokenizer allSwitchInfo = new StringTokenizer(output); while (allSwitchInfo.hasMoreTokens()) { @@ -208,7 +217,8 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { * @return null if the number of arguments is out of range, * or the output of the command. */ - private String runResolveCommand(List args) { + protected String runResolveCommand(List args, + String commandScriptName) { int loopCount = 0; if (args.size() == 0) { return null; @@ -225,7 +235,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { while (numProcessed != args.size()) { int start = maxArgs * loopCount; List cmdList = new ArrayList(); - cmdList.add(scriptName); + cmdList.add(commandScriptName); for (numProcessed = start; numProcessed < (start + maxArgs) && numProcessed < args.size(); numProcessed++) { cmdList.add(args.get(numProcessed)); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMappingWithDependency.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMappingWithDependency.java new file mode 100644 index 00000000000..8a0a0033fd2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMappingWithDependency.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.net; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; + + +/** + * This class extends ScriptBasedMapping class and implements + * the {@link DNSToSwitchMappingWithDependency} interface using + * a script configured via the + * {@link CommonConfigurationKeys#NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY} option. + *

+ * It contains a static class RawScriptBasedMappingWithDependency + * that performs the getDependency work. + *

+ */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ScriptBasedMappingWithDependency extends ScriptBasedMapping + implements DNSToSwitchMappingWithDependency { + /** + * key to the dependency script filename {@value} + */ + static final String DEPENDENCY_SCRIPT_FILENAME_KEY = + CommonConfigurationKeys.NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY; + + private Map> dependencyCache = + new ConcurrentHashMap>(); + + /** + * Create an instance with the default configuration. + *

+ * Calling {@link #setConf(Configuration)} will trigger a + * re-evaluation of the configuration settings and so be used to + * set up the mapping script. + */ + public ScriptBasedMappingWithDependency() { + super(new RawScriptBasedMappingWithDependency()); + } + + /** + * Get the cached mapping and convert it to its real type + * @return the inner raw script mapping. + */ + private RawScriptBasedMappingWithDependency getRawMapping() { + return (RawScriptBasedMappingWithDependency)rawMapping; + } + + @Override + public String toString() { + return "script-based mapping with " + getRawMapping().toString(); + } + + /** + * {@inheritDoc} + *

+ * This will get called in the superclass constructor, so a check is needed + * to ensure that the raw mapping is defined before trying to relaying a null + * configuration. + * @param conf + */ + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + getRawMapping().setConf(conf); + } + + /** + * Get dependencies in the topology for a given host + * @param name - host name for which we are getting dependency + * @return a list of hosts dependent on the provided host name + */ + @Override + public List getDependency(String name) { + //normalize all input names to be in the form of IP addresses + name = NetUtils.normalizeHostName(name); + + if (name==null) { + return Collections.emptyList(); + } + + List dependencies = dependencyCache.get(name); + if (dependencies == null) { + //not cached + dependencies = getRawMapping().getDependency(name); + if(dependencies != null) { + dependencyCache.put(name, dependencies); + } + } + + return dependencies; +} + + /** + * This is the uncached script mapping that is fed into the cache managed + * by the superclass {@link CachedDNSToSwitchMapping} + */ + private static final class RawScriptBasedMappingWithDependency + extends ScriptBasedMapping.RawScriptBasedMapping + implements DNSToSwitchMappingWithDependency { + private String dependencyScriptName; + + /** + * Set the configuration and extract the configuration parameters of interest + * @param conf the new configuration + */ + @Override + public void setConf (Configuration conf) { + super.setConf(conf); + if (conf != null) { + dependencyScriptName = conf.get(DEPENDENCY_SCRIPT_FILENAME_KEY); + } else { + dependencyScriptName = null; + } + } + + /** + * Constructor. The mapping is not ready to use until + * {@link #setConf(Configuration)} has been called + */ + public RawScriptBasedMappingWithDependency() {} + + @Override + public List getDependency(String name) { + if (name==null || dependencyScriptName==null) { + return Collections.emptyList(); + } + + List m = new LinkedList(); + List args = new ArrayList(1); + args.add(name); + + String output = runResolveCommand(args,dependencyScriptName); + if (output != null) { + StringTokenizer allSwitchInfo = new StringTokenizer(output); + while (allSwitchInfo.hasMoreTokens()) { + String switchInfo = allSwitchInfo.nextToken(); + m.add(switchInfo); + } + } else { + // an error occurred. return null to signify this. + // (exn was already logged in runResolveCommand) + return null; + } + + return m; + } + + @Override + public String toString() { + return super.toString() + ", " + dependencyScriptName != null ? + ("dependency script " + dependencyScriptName) : NO_SCRIPT; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestScriptBasedMappingWithDependency.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestScriptBasedMappingWithDependency.java new file mode 100644 index 00000000000..77da45b733e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestScriptBasedMappingWithDependency.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import junit.framework.TestCase; +import org.junit.Test; + +public class TestScriptBasedMappingWithDependency extends TestCase { + + + public TestScriptBasedMappingWithDependency() { + + } + + @Test + public void testNoArgsMeansNoResult() { + Configuration conf = new Configuration(); + conf.setInt(ScriptBasedMapping.SCRIPT_ARG_COUNT_KEY, + ScriptBasedMapping.MIN_ALLOWABLE_ARGS - 1); + conf.set(ScriptBasedMapping.SCRIPT_FILENAME_KEY, "any-filename-1"); + conf.set(ScriptBasedMappingWithDependency.DEPENDENCY_SCRIPT_FILENAME_KEY, + "any-filename-2"); + conf.setInt(ScriptBasedMapping.SCRIPT_ARG_COUNT_KEY, 10); + + ScriptBasedMappingWithDependency mapping = createMapping(conf); + List names = new ArrayList(); + names.add("some.machine.name"); + names.add("other.machine.name"); + List result = mapping.resolve(names); + assertNull("Expected an empty list for resolve", result); + result = mapping.getDependency("some.machine.name"); + assertNull("Expected an empty list for getDependency", result); + } + + @Test + public void testNoFilenameMeansSingleSwitch() throws Throwable { + Configuration conf = new Configuration(); + ScriptBasedMapping mapping = createMapping(conf); + assertTrue("Expected to be single switch", mapping.isSingleSwitch()); + assertTrue("Expected to be single switch", + AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping)); + } + + @Test + public void testFilenameMeansMultiSwitch() throws Throwable { + Configuration conf = new Configuration(); + conf.set(ScriptBasedMapping.SCRIPT_FILENAME_KEY, "any-filename"); + ScriptBasedMapping mapping = createMapping(conf); + assertFalse("Expected to be multi switch", mapping.isSingleSwitch()); + mapping.setConf(new Configuration()); + assertTrue("Expected to be single switch", mapping.isSingleSwitch()); + } + + @Test + public void testNullConfig() throws Throwable { + ScriptBasedMapping mapping = createMapping(null); + assertTrue("Expected to be single switch", mapping.isSingleSwitch()); + } + + private ScriptBasedMappingWithDependency createMapping(Configuration conf) { + ScriptBasedMappingWithDependency mapping = + new ScriptBasedMappingWithDependency(); + mapping.setConf(conf); + return mapping; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 62a5947d3e5..9e1f69b419d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -13,6 +13,9 @@ Release 2.5.0 - UNRELEASED HDFS-6281. Provide option to use the NFS Gateway without having to use the Hadoop portmapper. (atm) + HDFS-5168. Add cross node dependency support to BlockPlacementPolicy. + (Nikola Vujic via szetszwo) + IMPROVEMENTS HDFS-6007. Update documentation about short-circuit local reads (iwasakims diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index e0a9e2bce20..6710a79833c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -29,6 +29,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import java.util.Date; +import java.util.LinkedList; +import java.util.List; import static org.apache.hadoop.hdfs.DFSUtil.percent2String; @@ -50,6 +52,8 @@ public class DatanodeInfo extends DatanodeID implements Node { private int xceiverCount; private String location = NetworkTopology.DEFAULT_RACK; private String softwareVersion; + private List dependentHostNames = new LinkedList(); + // Datanode administrative states public enum AdminStates { @@ -271,6 +275,21 @@ public class DatanodeInfo extends DatanodeID implements Node { public synchronized void setNetworkLocation(String location) { this.location = NodeBase.normalize(location); } + + /** Add a hostname to a list of network dependencies */ + public void addDependentHostName(String hostname) { + dependentHostNames.add(hostname); + } + + /** List of Network dependencies */ + public List getDependentHostNames() { + return dependentHostNames; + } + + /** Sets the network dependencies */ + public void setDependentHostNames(List dependencyList) { + dependentHostNames = dependencyList; + } /** A formatted string for reporting the status of the DataNode. */ public String getDatanodeReport() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 4e956581084..92a620a3ec3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -842,7 +842,7 @@ public class Balancer { */ private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { - if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof + if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3c771860bca..0b4f68d7b7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -267,7 +267,8 @@ public class BlockManager { blocksMap = new BlocksMap( LightWeightGSet.computeCapacity(2.0, "BlocksMap")); blockplacement = BlockPlacementPolicy.getInstance( - conf, stats, datanodeManager.getNetworkTopology()); + conf, stats, datanodeManager.getNetworkTopology(), + datanodeManager.getHost2DatanodeMap()); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 455b6708c7a..5a7f14fe82c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -139,7 +139,8 @@ public abstract class BlockPlacementPolicy { * @param clusterMap cluster topology */ abstract protected void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap); /** * Get an instance of the configured Block Placement Policy based on the @@ -153,14 +154,15 @@ public abstract class BlockPlacementPolicy { */ public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { final Class replicatorClass = conf.getClass( DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, BlockPlacementPolicy.class); final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( replicatorClass, conf); - replicator.initialize(conf, stats, clusterMap); + replicator.initialize(conf, stats, clusterMap, host2datanodeMap); return replicator; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index da29f49bffb..8dc933b30b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { protected boolean considerLoad; private boolean preferLocalNode = true; protected NetworkTopology clusterMap; + protected Host2NodesMap host2datanodeMap; private FSClusterStats stats; protected long heartbeatInterval; // interval for DataNode heartbeats private long staleInterval; // interval used to identify stale DataNodes @@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { protected int tolerateHeartbeatMultiplier; protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + initialize(conf, stats, clusterMap, host2datanodeMap); } protected BlockPlacementPolicyDefault() { @@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { @Override public void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { this.considerLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; + this.host2datanodeMap = host2datanodeMap; this.heartbeatInterval = conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index ae5660ee51f..98069de6dd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -47,16 +47,17 @@ import org.apache.hadoop.net.NodeBase; public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault { protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, DatanodeManager datanodeManager) { + initialize(conf, stats, clusterMap, host2datanodeMap); } protected BlockPlacementPolicyWithNodeGroup() { } public void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - super.initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + super.initialize(conf, stats, clusterMap, host2datanodeMap); } /** choose local node of localMachine as the target. @@ -243,6 +244,36 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau countOfExcludedNodes++; } } + + countOfExcludedNodes += addDependentNodesToExcludedNodes( + chosenNode, excludedNodes); + return countOfExcludedNodes; + } + + /** + * Add all nodes from a dependent nodes list to excludedNodes. + * @return number of new excluded nodes + */ + private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode, + Set excludedNodes) { + if (this.host2datanodeMap == null) { + return 0; + } + int countOfExcludedNodes = 0; + for(String hostname : chosenNode.getDependentHostNames()) { + DatanodeDescriptor node = + this.host2datanodeMap.getDataNodeByHostName(hostname); + if(node!=null) { + if (excludedNodes.add(node)) { + countOfExcludedNodes++; + } + } else { + LOG.warn("Not able to find datanode " + hostname + + " which has dependency with datanode " + + chosenNode.getHostName()); + } + } + return countOfExcludedNodes; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 3e7c7105d3a..cf24f03a859 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -373,6 +373,11 @@ public class DatanodeManager { return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); } + /** @return the Host2NodesMap */ + public Host2NodesMap getHost2DatanodeMap() { + return this.host2DatanodeMap; + } + /** * Given datanode address or host name, returns the DatanodeDescriptor for the * same, or if it doesn't find the datanode, it looks for a machine local and @@ -677,6 +682,52 @@ public class DatanodeManager { return networkLocation; } + /** + * Resolve a node's dependencies in the network. If the DNS to switch + * mapping fails then this method returns empty list of dependencies + * @param node to get dependencies for + * @return List of dependent host names + */ + private List getNetworkDependenciesWithDefault(DatanodeInfo node) { + List dependencies; + try { + dependencies = getNetworkDependencies(node); + } catch (UnresolvedTopologyException e) { + LOG.error("Unresolved dependency mapping for host " + + node.getHostName() +". Continuing with an empty dependency list"); + dependencies = Collections.emptyList(); + } + return dependencies; + } + + /** + * Resolves a node's dependencies in the network. If the DNS to switch + * mapping fails to get dependencies, then this method throws + * UnresolvedTopologyException. + * @param node to get dependencies for + * @return List of dependent host names + * @throws UnresolvedTopologyException if the DNS to switch mapping fails + */ + private List getNetworkDependencies(DatanodeInfo node) + throws UnresolvedTopologyException { + List dependencies = Collections.emptyList(); + + if (dnsToSwitchMapping instanceof DNSToSwitchMappingWithDependency) { + //Get dependencies + dependencies = + ((DNSToSwitchMappingWithDependency)dnsToSwitchMapping).getDependency( + node.getHostName()); + if(dependencies == null) { + LOG.error("The dependency call returned null for host " + + node.getHostName()); + throw new UnresolvedTopologyException("The dependency call returned " + + "null for host " + node.getHostName()); + } + } + + return dependencies; + } + /** * Remove an already decommissioned data node who is neither in include nor * exclude hosts lists from the the list of live or dead nodes. This is used @@ -868,12 +919,14 @@ public class DatanodeManager { nodeS.setDisallowed(false); // Node is in the include list // resolve network location - if(this.rejectUnresolvedTopologyDN) - { - nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + if(this.rejectUnresolvedTopologyDN) { + nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + nodeS.setDependentHostNames(getNetworkDependencies(nodeS)); } else { nodeS.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); + nodeS.setDependentHostNames( + getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); @@ -899,9 +952,12 @@ public class DatanodeManager { // resolve network location if(this.rejectUnresolvedTopologyDN) { nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); + nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr)); } else { nodeDescr.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr)); + nodeDescr.setDependentHostNames( + getNetworkDependenciesWithDefault(nodeDescr)); } networktopology.add(nodeDescr); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java index e6f00246d89..420c1414d5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DFSUtil; @InterfaceAudience.Private @InterfaceStability.Evolving class Host2NodesMap { + private HashMap mapHost = new HashMap(); private final HashMap map = new HashMap(); private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock(); @@ -69,6 +70,10 @@ class Host2NodesMap { } String ipAddr = node.getIpAddr(); + String hostname = node.getHostName(); + + mapHost.put(hostname, ipAddr); + DatanodeDescriptor[] nodes = map.get(ipAddr); DatanodeDescriptor[] newNodes; if (nodes==null) { @@ -95,6 +100,7 @@ class Host2NodesMap { } String ipAddr = node.getIpAddr(); + String hostname = node.getHostName(); hostmapLock.writeLock().lock(); try { @@ -105,6 +111,8 @@ class Host2NodesMap { if (nodes.length==1) { if (nodes[0]==node) { map.remove(ipAddr); + //remove hostname key since last datanode is removed + mapHost.remove(hostname); return true; } else { return false; @@ -188,12 +196,40 @@ class Host2NodesMap { } } + + + /** get a data node by its hostname. This should be used if only one + * datanode service is running on a hostname. If multiple datanodes + * are running on a hostname then use methods getDataNodeByXferAddr and + * getDataNodeByHostNameAndPort. + * @return DatanodeDescriptor if found; otherwise null. + */ + DatanodeDescriptor getDataNodeByHostName(String hostname) { + if(hostname == null) { + return null; + } + + hostmapLock.readLock().lock(); + try { + String ipAddr = mapHost.get(hostname); + if(ipAddr == null) { + return null; + } else { + return getDatanodeByHost(ipAddr); + } + } finally { + hostmapLock.readLock().unlock(); + } + } + @Override public String toString() { final StringBuilder b = new StringBuilder(getClass().getSimpleName()) .append("["); - for(Map.Entry e : map.entrySet()) { - b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue())); + for(Map.Entry host: mapHost.entrySet()) { + DatanodeDescriptor[] e = map.get(host.getValue()); + b.append("\n " + host.getKey() + " => "+host.getValue() + " => " + + Arrays.asList(e)); } return b.append("\n]").toString(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index b73a46d6744..9a66505436b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -172,8 +172,10 @@ public class NamenodeFsck { this.minReplication = minReplication; this.remoteAddress = remoteAddress; this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, - networktopology); - + networktopology, + namenode.getNamesystem().getBlockManager().getDatanodeManager() + .getHost2DatanodeMap()); + for (Iterator it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); if (key.equals("path")) { this.path = pmap.get("path")[0]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index c5df001a998..585e55467e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -917,29 +917,47 @@ public class DFSTestUtil { return getDatanodeDescriptor(ipAddr, DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation); } + + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, + String rackLocation, String hostname) { + return getDatanodeDescriptor(ipAddr, + DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname); + } public static DatanodeStorageInfo createDatanodeStorageInfo( String storageID, String ip) { - return createDatanodeStorageInfo(storageID, ip, "defaultRack"); + return createDatanodeStorageInfo(storageID, ip, "defaultRack", "host"); } + public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) { - return createDatanodeStorageInfos(racks.length, racks); + return createDatanodeStorageInfos(racks, null); } - public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, String... racks) { + + public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks, String[] hostnames) { + return createDatanodeStorageInfos(racks.length, racks, hostnames); + } + + public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) { + return createDatanodeStorageInfos(n, null, null); + } + + public static DatanodeStorageInfo[] createDatanodeStorageInfos( + int n, String[] racks, String[] hostnames) { DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n]; for(int i = storages.length; i > 0; ) { final String storageID = "s" + i; final String ip = i + "." + i + "." + i + "." + i; i--; - final String rack = i < racks.length? racks[i]: "defaultRack"; - storages[i] = createDatanodeStorageInfo(storageID, ip, rack); + final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack"; + final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host"; + storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname); } return storages; } public static DatanodeStorageInfo createDatanodeStorageInfo( - String storageID, String ip, String rack) { + String storageID, String ip, String rack, String hostname) { final DatanodeStorage storage = new DatanodeStorage(storageID); - final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage); + final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname); return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); } public static DatanodeDescriptor[] toDatanodeDescriptor( @@ -952,8 +970,8 @@ public class DFSTestUtil { } public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, - int port, String rackLocation) { - DatanodeID dnId = new DatanodeID(ipAddr, "host", + int port, String rackLocation, String hostname) { + DatanodeID dnId = new DatanodeID(ipAddr, hostname, UUID.randomUUID().toString(), port, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, @@ -961,6 +979,11 @@ public class DFSTestUtil { return new DatanodeDescriptor(dnId, rackLocation); } + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, + int port, String rackLocation) { + return getDatanodeDescriptor(ipAddr, port, rackLocation, "host"); + } + public static DatanodeRegistration getLocalDatanodeRegistration() { return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo( NodeType.DATA_NODE), new ExportedBlockKeys(), VersionInfo.getVersion()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 0336fb4ede6..8ff716387ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -236,8 +236,13 @@ public class BlockManagerTestUtil { public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, String rackLocation, DatanodeStorage storage) { + return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host"); + } + + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, + String rackLocation, DatanodeStorage storage, String hostname) { DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr, - DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation); + DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname); if (storage != null) { dn.updateStorage(storage); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index ffcc18cc789..ce7340648f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -47,11 +47,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; + public class TestReplicationPolicyWithNodeGroup { private static final int BLOCK_SIZE = 1024; private static final int NUM_OF_DATANODES = 8; private static final int NUM_OF_DATANODES_BOUNDARY = 6; private static final int NUM_OF_DATANODES_MORE_TARGETS = 12; + private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6; private final Configuration CONF = new HdfsConfiguration(); private NetworkTopology cluster; private NameNode namenode; @@ -113,7 +115,33 @@ public class TestReplicationPolicyWithNodeGroup { private final static DatanodeDescriptor NODE = new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7")); - + + private static final DatanodeStorageInfo[] storagesForDependencies; + private static final DatanodeDescriptor[] dataNodesForDependencies; + static { + final String[] racksForDependencies = { + "/d1/r1/n1", + "/d1/r1/n1", + "/d1/r1/n2", + "/d1/r1/n2", + "/d1/r1/n3", + "/d1/r1/n4" + }; + final String[] hostNamesForDependencies = { + "h1", + "h2", + "h3", + "h4", + "h5", + "h6" + }; + + storagesForDependencies = DFSTestUtil.createDatanodeStorageInfos( + racksForDependencies, hostNamesForDependencies); + dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies); + + }; + @Before public void setUp() throws Exception { FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); @@ -720,5 +748,63 @@ public class TestReplicationPolicyWithNodeGroup { assertEquals(targets.length, 6); } - + @Test + public void testChooseTargetWithDependencies() throws Exception { + for(int i=0; i node2, and node3<->node4) + dataNodesForDependencies[1].addDependentHostName( + dataNodesForDependencies[2].getHostName()); + dataNodesForDependencies[2].addDependentHostName( + dataNodesForDependencies[1].getHostName()); + dataNodesForDependencies[3].addDependentHostName( + dataNodesForDependencies[4].getHostName()); + dataNodesForDependencies[4].addDependentHostName( + dataNodesForDependencies[3].getHostName()); + + //Update heartbeat + for(int i=0; i chosenNodes = new ArrayList(); + + DatanodeStorageInfo[] targets; + Set excludedNodes = new HashSet(); + excludedNodes.add(dataNodesForDependencies[5]); + + //try to select three targets as there are three node groups + targets = chooseTarget(3, dataNodesForDependencies[1], chosenNodes, excludedNodes); + + //Even there are three node groups, verify that + //only two targets are selected due to dependencies + assertEquals(targets.length, 2); + assertEquals(targets[0], storagesForDependencies[1]); + assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4])); + + //verify that all data nodes are in the excluded list + assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES); + for(int i=0; i