svn merge -c 1592179 from trunk for HDFS-5168. Add cross node dependency support to BlockPlacementPolicy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1592181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-05-03 11:04:35 +00:00
parent 775b27a6fe
commit 75955e7c50
19 changed files with 644 additions and 37 deletions

View File

@ -78,6 +78,8 @@ public class CommonConfigurationKeysPublic {
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY = public static final String NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY =
"net.topology.table.file.name"; "net.topology.table.file.name";
public static final String NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY =
"net.topology.dependency.script.file.name";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String FS_TRASH_CHECKPOINT_INTERVAL_KEY = public static final String FS_TRASH_CHECKPOINT_INTERVAL_KEY =

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.net;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* An interface that must be implemented to allow pluggable
* DNS-name/IP-address to RackID resolvers.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface DNSToSwitchMappingWithDependency extends DNSToSwitchMapping {
/**
* Get a list of dependent DNS-names for a given DNS-name/IP-address.
* Dependent DNS-names fall into the same fault domain which must be
* taken into account when placing replicas. This is intended to be used for
* cross node group dependencies when node groups are not sufficient to
* distinguish data nodes by fault domains. In practice, this is needed when
* a compute server runs VMs which use shared storage (as opposite to
* directly attached storage). In this case data nodes fall in two different
* fault domains. One fault domain is defined by a compute server and
* the other is defined by storage. With node groups we can group data nodes
* either by server fault domain or by storage fault domain. However one of
* the fault domains cannot be handled and there we need to define cross node
* group dependencies. These dependencies are applied in block placement
* polices which ensure that no two replicas will be on two dependent nodes.
* @param name - host name or IP address of a data node. Input host name
* parameter must take a value of dfs.datanode.hostname config value if this
* config property is set. Otherwise FQDN of the data node is used.
* @return list of dependent host names. If dfs.datanode.hostname config
* property is set, then its value must be returned.
* Otherwise, FQDN is returned.
*/
public List<String> getDependency(String name);
}

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { public class ScriptBasedMapping extends CachedDNSToSwitchMapping {
/** /**
* Minimum number of arguments: {@value} * Minimum number of arguments: {@value}
@ -63,6 +63,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
*/ */
static final String SCRIPT_FILENAME_KEY = static final String SCRIPT_FILENAME_KEY =
CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY ; CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY ;
/** /**
* key to the argument count that the script supports * key to the argument count that the script supports
* {@value} * {@value}
@ -84,7 +85,15 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
* *
*/ */
public ScriptBasedMapping() { public ScriptBasedMapping() {
super(new RawScriptBasedMapping()); this(new RawScriptBasedMapping());
}
/**
* Create an instance from the given raw mapping
* @param rawMap raw DNSTOSwithMapping
*/
public ScriptBasedMapping(DNSToSwitchMapping rawMap) {
super(rawMap);
} }
/** /**
@ -132,7 +141,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
* This is the uncached script mapping that is fed into the cache managed * This is the uncached script mapping that is fed into the cache managed
* by the superclass {@link CachedDNSToSwitchMapping} * by the superclass {@link CachedDNSToSwitchMapping}
*/ */
private static final class RawScriptBasedMapping protected static class RawScriptBasedMapping
extends AbstractDNSToSwitchMapping { extends AbstractDNSToSwitchMapping {
private String scriptName; private String scriptName;
private int maxArgs; //max hostnames per call of the script private int maxArgs; //max hostnames per call of the script
@ -176,7 +185,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
return m; return m;
} }
String output = runResolveCommand(names); String output = runResolveCommand(names, scriptName);
if (output != null) { if (output != null) {
StringTokenizer allSwitchInfo = new StringTokenizer(output); StringTokenizer allSwitchInfo = new StringTokenizer(output);
while (allSwitchInfo.hasMoreTokens()) { while (allSwitchInfo.hasMoreTokens()) {
@ -208,7 +217,8 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
* @return null if the number of arguments is out of range, * @return null if the number of arguments is out of range,
* or the output of the command. * or the output of the command.
*/ */
private String runResolveCommand(List<String> args) { protected String runResolveCommand(List<String> args,
String commandScriptName) {
int loopCount = 0; int loopCount = 0;
if (args.size() == 0) { if (args.size() == 0) {
return null; return null;
@ -225,7 +235,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
while (numProcessed != args.size()) { while (numProcessed != args.size()) {
int start = maxArgs * loopCount; int start = maxArgs * loopCount;
List<String> cmdList = new ArrayList<String>(); List<String> cmdList = new ArrayList<String>();
cmdList.add(scriptName); cmdList.add(commandScriptName);
for (numProcessed = start; numProcessed < (start + maxArgs) && for (numProcessed = start; numProcessed < (start + maxArgs) &&
numProcessed < args.size(); numProcessed++) { numProcessed < args.size(); numProcessed++) {
cmdList.add(args.get(numProcessed)); cmdList.add(args.get(numProcessed));

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.net;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
* This class extends ScriptBasedMapping class and implements
* the {@link DNSToSwitchMappingWithDependency} interface using
* a script configured via the
* {@link CommonConfigurationKeys#NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY} option.
* <p/>
* It contains a static class <code>RawScriptBasedMappingWithDependency</code>
* that performs the getDependency work.
* <p/>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ScriptBasedMappingWithDependency extends ScriptBasedMapping
implements DNSToSwitchMappingWithDependency {
/**
* key to the dependency script filename {@value}
*/
static final String DEPENDENCY_SCRIPT_FILENAME_KEY =
CommonConfigurationKeys.NET_DEPENDENCY_SCRIPT_FILE_NAME_KEY;
private Map<String, List<String>> dependencyCache =
new ConcurrentHashMap<String, List<String>>();
/**
* Create an instance with the default configuration.
* </p>
* Calling {@link #setConf(Configuration)} will trigger a
* re-evaluation of the configuration settings and so be used to
* set up the mapping script.
*/
public ScriptBasedMappingWithDependency() {
super(new RawScriptBasedMappingWithDependency());
}
/**
* Get the cached mapping and convert it to its real type
* @return the inner raw script mapping.
*/
private RawScriptBasedMappingWithDependency getRawMapping() {
return (RawScriptBasedMappingWithDependency)rawMapping;
}
@Override
public String toString() {
return "script-based mapping with " + getRawMapping().toString();
}
/**
* {@inheritDoc}
* <p/>
* This will get called in the superclass constructor, so a check is needed
* to ensure that the raw mapping is defined before trying to relaying a null
* configuration.
* @param conf
*/
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
getRawMapping().setConf(conf);
}
/**
* Get dependencies in the topology for a given host
* @param name - host name for which we are getting dependency
* @return a list of hosts dependent on the provided host name
*/
@Override
public List<String> getDependency(String name) {
//normalize all input names to be in the form of IP addresses
name = NetUtils.normalizeHostName(name);
if (name==null) {
return Collections.emptyList();
}
List<String> dependencies = dependencyCache.get(name);
if (dependencies == null) {
//not cached
dependencies = getRawMapping().getDependency(name);
if(dependencies != null) {
dependencyCache.put(name, dependencies);
}
}
return dependencies;
}
/**
* This is the uncached script mapping that is fed into the cache managed
* by the superclass {@link CachedDNSToSwitchMapping}
*/
private static final class RawScriptBasedMappingWithDependency
extends ScriptBasedMapping.RawScriptBasedMapping
implements DNSToSwitchMappingWithDependency {
private String dependencyScriptName;
/**
* Set the configuration and extract the configuration parameters of interest
* @param conf the new configuration
*/
@Override
public void setConf (Configuration conf) {
super.setConf(conf);
if (conf != null) {
dependencyScriptName = conf.get(DEPENDENCY_SCRIPT_FILENAME_KEY);
} else {
dependencyScriptName = null;
}
}
/**
* Constructor. The mapping is not ready to use until
* {@link #setConf(Configuration)} has been called
*/
public RawScriptBasedMappingWithDependency() {}
@Override
public List<String> getDependency(String name) {
if (name==null || dependencyScriptName==null) {
return Collections.emptyList();
}
List <String> m = new LinkedList<String>();
List <String> args = new ArrayList<String>(1);
args.add(name);
String output = runResolveCommand(args,dependencyScriptName);
if (output != null) {
StringTokenizer allSwitchInfo = new StringTokenizer(output);
while (allSwitchInfo.hasMoreTokens()) {
String switchInfo = allSwitchInfo.nextToken();
m.add(switchInfo);
}
} else {
// an error occurred. return null to signify this.
// (exn was already logged in runResolveCommand)
return null;
}
return m;
}
@Override
public String toString() {
return super.toString() + ", " + dependencyScriptName != null ?
("dependency script " + dependencyScriptName) : NO_SCRIPT;
}
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.net;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import junit.framework.TestCase;
import org.junit.Test;
public class TestScriptBasedMappingWithDependency extends TestCase {
public TestScriptBasedMappingWithDependency() {
}
@Test
public void testNoArgsMeansNoResult() {
Configuration conf = new Configuration();
conf.setInt(ScriptBasedMapping.SCRIPT_ARG_COUNT_KEY,
ScriptBasedMapping.MIN_ALLOWABLE_ARGS - 1);
conf.set(ScriptBasedMapping.SCRIPT_FILENAME_KEY, "any-filename-1");
conf.set(ScriptBasedMappingWithDependency.DEPENDENCY_SCRIPT_FILENAME_KEY,
"any-filename-2");
conf.setInt(ScriptBasedMapping.SCRIPT_ARG_COUNT_KEY, 10);
ScriptBasedMappingWithDependency mapping = createMapping(conf);
List<String> names = new ArrayList<String>();
names.add("some.machine.name");
names.add("other.machine.name");
List<String> result = mapping.resolve(names);
assertNull("Expected an empty list for resolve", result);
result = mapping.getDependency("some.machine.name");
assertNull("Expected an empty list for getDependency", result);
}
@Test
public void testNoFilenameMeansSingleSwitch() throws Throwable {
Configuration conf = new Configuration();
ScriptBasedMapping mapping = createMapping(conf);
assertTrue("Expected to be single switch", mapping.isSingleSwitch());
assertTrue("Expected to be single switch",
AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping));
}
@Test
public void testFilenameMeansMultiSwitch() throws Throwable {
Configuration conf = new Configuration();
conf.set(ScriptBasedMapping.SCRIPT_FILENAME_KEY, "any-filename");
ScriptBasedMapping mapping = createMapping(conf);
assertFalse("Expected to be multi switch", mapping.isSingleSwitch());
mapping.setConf(new Configuration());
assertTrue("Expected to be single switch", mapping.isSingleSwitch());
}
@Test
public void testNullConfig() throws Throwable {
ScriptBasedMapping mapping = createMapping(null);
assertTrue("Expected to be single switch", mapping.isSingleSwitch());
}
private ScriptBasedMappingWithDependency createMapping(Configuration conf) {
ScriptBasedMappingWithDependency mapping =
new ScriptBasedMappingWithDependency();
mapping.setConf(conf);
return mapping;
}
}

View File

@ -13,6 +13,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6281. Provide option to use the NFS Gateway without having to use the HDFS-6281. Provide option to use the NFS Gateway without having to use the
Hadoop portmapper. (atm) Hadoop portmapper. (atm)
HDFS-5168. Add cross node dependency support to BlockPlacementPolicy.
(Nikola Vujic via szetszwo)
IMPROVEMENTS IMPROVEMENTS
HDFS-6007. Update documentation about short-circuit local reads (iwasakims HDFS-6007. Update documentation about short-circuit local reads (iwasakims

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import java.util.Date; import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSUtil.percent2String; import static org.apache.hadoop.hdfs.DFSUtil.percent2String;
@ -50,6 +52,8 @@ public class DatanodeInfo extends DatanodeID implements Node {
private int xceiverCount; private int xceiverCount;
private String location = NetworkTopology.DEFAULT_RACK; private String location = NetworkTopology.DEFAULT_RACK;
private String softwareVersion; private String softwareVersion;
private List<String> dependentHostNames = new LinkedList<String>();
// Datanode administrative states // Datanode administrative states
public enum AdminStates { public enum AdminStates {
@ -272,6 +276,21 @@ public class DatanodeInfo extends DatanodeID implements Node {
this.location = NodeBase.normalize(location); this.location = NodeBase.normalize(location);
} }
/** Add a hostname to a list of network dependencies */
public void addDependentHostName(String hostname) {
dependentHostNames.add(hostname);
}
/** List of Network dependencies */
public List<String> getDependentHostNames() {
return dependentHostNames;
}
/** Sets the network dependencies */
public void setDependentHostNames(List<String> dependencyList) {
dependentHostNames = dependencyList;
}
/** A formatted string for reporting the status of the DataNode. */ /** A formatted string for reporting the status of the DataNode. */
public String getDatanodeReport() { public String getDatanodeReport() {
StringBuilder buffer = new StringBuilder(); StringBuilder buffer = new StringBuilder();

View File

@ -842,7 +842,7 @@ public class Balancer {
*/ */
private static void checkReplicationPolicyCompatibility(Configuration conf private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException { ) throws UnsupportedActionException {
if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof
BlockPlacementPolicyDefault)) { BlockPlacementPolicyDefault)) {
throw new UnsupportedActionException( throw new UnsupportedActionException(
"Balancer without BlockPlacementPolicyDefault"); "Balancer without BlockPlacementPolicyDefault");

View File

@ -267,7 +267,8 @@ public class BlockManager {
blocksMap = new BlocksMap( blocksMap = new BlocksMap(
LightWeightGSet.computeCapacity(2.0, "BlocksMap")); LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
blockplacement = BlockPlacementPolicy.getInstance( blockplacement = BlockPlacementPolicy.getInstance(
conf, stats, datanodeManager.getNetworkTopology()); conf, stats, datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
pendingReplications = new PendingReplicationBlocks(conf.getInt( pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);

View File

@ -139,7 +139,8 @@ public abstract class BlockPlacementPolicy {
* @param clusterMap cluster topology * @param clusterMap cluster topology
*/ */
abstract protected void initialize(Configuration conf, FSClusterStats stats, abstract protected void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap); NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap);
/** /**
* Get an instance of the configured Block Placement Policy based on the * Get an instance of the configured Block Placement Policy based on the
@ -153,14 +154,15 @@ public abstract class BlockPlacementPolicy {
*/ */
public static BlockPlacementPolicy getInstance(Configuration conf, public static BlockPlacementPolicy getInstance(Configuration conf,
FSClusterStats stats, FSClusterStats stats,
NetworkTopology clusterMap) { NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap) {
final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass( final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class); BlockPlacementPolicy.class);
final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
replicatorClass, conf); replicatorClass, conf);
replicator.initialize(conf, stats, clusterMap); replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
return replicator; return replicator;
} }

View File

@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
protected boolean considerLoad; protected boolean considerLoad;
private boolean preferLocalNode = true; private boolean preferLocalNode = true;
protected NetworkTopology clusterMap; protected NetworkTopology clusterMap;
protected Host2NodesMap host2datanodeMap;
private FSClusterStats stats; private FSClusterStats stats;
protected long heartbeatInterval; // interval for DataNode heartbeats protected long heartbeatInterval; // interval for DataNode heartbeats
private long staleInterval; // interval used to identify stale DataNodes private long staleInterval; // interval used to identify stale DataNodes
@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
protected int tolerateHeartbeatMultiplier; protected int tolerateHeartbeatMultiplier;
protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) { NetworkTopology clusterMap,
initialize(conf, stats, clusterMap); Host2NodesMap host2datanodeMap) {
initialize(conf, stats, clusterMap, host2datanodeMap);
} }
protected BlockPlacementPolicyDefault() { protected BlockPlacementPolicyDefault() {
@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
@Override @Override
public void initialize(Configuration conf, FSClusterStats stats, public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) { NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap) {
this.considerLoad = conf.getBoolean( this.considerLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats; this.stats = stats;
this.clusterMap = clusterMap; this.clusterMap = clusterMap;
this.host2datanodeMap = host2datanodeMap;
this.heartbeatInterval = conf.getLong( this.heartbeatInterval = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;

View File

@ -47,16 +47,17 @@ import org.apache.hadoop.net.NodeBase;
public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault { public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) { NetworkTopology clusterMap, DatanodeManager datanodeManager) {
initialize(conf, stats, clusterMap); initialize(conf, stats, clusterMap, host2datanodeMap);
} }
protected BlockPlacementPolicyWithNodeGroup() { protected BlockPlacementPolicyWithNodeGroup() {
} }
public void initialize(Configuration conf, FSClusterStats stats, public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) { NetworkTopology clusterMap,
super.initialize(conf, stats, clusterMap); Host2NodesMap host2datanodeMap) {
super.initialize(conf, stats, clusterMap, host2datanodeMap);
} }
/** choose local node of localMachine as the target. /** choose local node of localMachine as the target.
@ -243,6 +244,36 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
countOfExcludedNodes++; countOfExcludedNodes++;
} }
} }
countOfExcludedNodes += addDependentNodesToExcludedNodes(
chosenNode, excludedNodes);
return countOfExcludedNodes;
}
/**
* Add all nodes from a dependent nodes list to excludedNodes.
* @return number of new excluded nodes
*/
private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode,
Set<Node> excludedNodes) {
if (this.host2datanodeMap == null) {
return 0;
}
int countOfExcludedNodes = 0;
for(String hostname : chosenNode.getDependentHostNames()) {
DatanodeDescriptor node =
this.host2datanodeMap.getDataNodeByHostName(hostname);
if(node!=null) {
if (excludedNodes.add(node)) {
countOfExcludedNodes++;
}
} else {
LOG.warn("Not able to find datanode " + hostname
+ " which has dependency with datanode "
+ chosenNode.getHostName());
}
}
return countOfExcludedNodes; return countOfExcludedNodes;
} }

View File

@ -373,6 +373,11 @@ public class DatanodeManager {
return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
} }
/** @return the Host2NodesMap */
public Host2NodesMap getHost2DatanodeMap() {
return this.host2DatanodeMap;
}
/** /**
* Given datanode address or host name, returns the DatanodeDescriptor for the * Given datanode address or host name, returns the DatanodeDescriptor for the
* same, or if it doesn't find the datanode, it looks for a machine local and * same, or if it doesn't find the datanode, it looks for a machine local and
@ -677,6 +682,52 @@ public class DatanodeManager {
return networkLocation; return networkLocation;
} }
/**
* Resolve a node's dependencies in the network. If the DNS to switch
* mapping fails then this method returns empty list of dependencies
* @param node to get dependencies for
* @return List of dependent host names
*/
private List<String> getNetworkDependenciesWithDefault(DatanodeInfo node) {
List<String> dependencies;
try {
dependencies = getNetworkDependencies(node);
} catch (UnresolvedTopologyException e) {
LOG.error("Unresolved dependency mapping for host " +
node.getHostName() +". Continuing with an empty dependency list");
dependencies = Collections.emptyList();
}
return dependencies;
}
/**
* Resolves a node's dependencies in the network. If the DNS to switch
* mapping fails to get dependencies, then this method throws
* UnresolvedTopologyException.
* @param node to get dependencies for
* @return List of dependent host names
* @throws UnresolvedTopologyException if the DNS to switch mapping fails
*/
private List<String> getNetworkDependencies(DatanodeInfo node)
throws UnresolvedTopologyException {
List<String> dependencies = Collections.emptyList();
if (dnsToSwitchMapping instanceof DNSToSwitchMappingWithDependency) {
//Get dependencies
dependencies =
((DNSToSwitchMappingWithDependency)dnsToSwitchMapping).getDependency(
node.getHostName());
if(dependencies == null) {
LOG.error("The dependency call returned null for host " +
node.getHostName());
throw new UnresolvedTopologyException("The dependency call returned " +
"null for host " + node.getHostName());
}
}
return dependencies;
}
/** /**
* Remove an already decommissioned data node who is neither in include nor * Remove an already decommissioned data node who is neither in include nor
* exclude hosts lists from the the list of live or dead nodes. This is used * exclude hosts lists from the the list of live or dead nodes. This is used
@ -868,12 +919,14 @@ public class DatanodeManager {
nodeS.setDisallowed(false); // Node is in the include list nodeS.setDisallowed(false); // Node is in the include list
// resolve network location // resolve network location
if(this.rejectUnresolvedTopologyDN) if(this.rejectUnresolvedTopologyDN) {
{
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
} else { } else {
nodeS.setNetworkLocation( nodeS.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
nodeS.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeS));
} }
getNetworkTopology().add(nodeS); getNetworkTopology().add(nodeS);
@ -899,9 +952,12 @@ public class DatanodeManager {
// resolve network location // resolve network location
if(this.rejectUnresolvedTopologyDN) { if(this.rejectUnresolvedTopologyDN) {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
} else { } else {
nodeDescr.setNetworkLocation( nodeDescr.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr)); resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
nodeDescr.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeDescr));
} }
networktopology.add(nodeDescr); networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
class Host2NodesMap { class Host2NodesMap {
private HashMap<String, String> mapHost = new HashMap<String, String>();
private final HashMap<String, DatanodeDescriptor[]> map private final HashMap<String, DatanodeDescriptor[]> map
= new HashMap<String, DatanodeDescriptor[]>(); = new HashMap<String, DatanodeDescriptor[]>();
private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock(); private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
@ -69,6 +70,10 @@ class Host2NodesMap {
} }
String ipAddr = node.getIpAddr(); String ipAddr = node.getIpAddr();
String hostname = node.getHostName();
mapHost.put(hostname, ipAddr);
DatanodeDescriptor[] nodes = map.get(ipAddr); DatanodeDescriptor[] nodes = map.get(ipAddr);
DatanodeDescriptor[] newNodes; DatanodeDescriptor[] newNodes;
if (nodes==null) { if (nodes==null) {
@ -95,6 +100,7 @@ class Host2NodesMap {
} }
String ipAddr = node.getIpAddr(); String ipAddr = node.getIpAddr();
String hostname = node.getHostName();
hostmapLock.writeLock().lock(); hostmapLock.writeLock().lock();
try { try {
@ -105,6 +111,8 @@ class Host2NodesMap {
if (nodes.length==1) { if (nodes.length==1) {
if (nodes[0]==node) { if (nodes[0]==node) {
map.remove(ipAddr); map.remove(ipAddr);
//remove hostname key since last datanode is removed
mapHost.remove(hostname);
return true; return true;
} else { } else {
return false; return false;
@ -188,12 +196,40 @@ class Host2NodesMap {
} }
} }
/** get a data node by its hostname. This should be used if only one
* datanode service is running on a hostname. If multiple datanodes
* are running on a hostname then use methods getDataNodeByXferAddr and
* getDataNodeByHostNameAndPort.
* @return DatanodeDescriptor if found; otherwise null.
*/
DatanodeDescriptor getDataNodeByHostName(String hostname) {
if(hostname == null) {
return null;
}
hostmapLock.readLock().lock();
try {
String ipAddr = mapHost.get(hostname);
if(ipAddr == null) {
return null;
} else {
return getDatanodeByHost(ipAddr);
}
} finally {
hostmapLock.readLock().unlock();
}
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName()) final StringBuilder b = new StringBuilder(getClass().getSimpleName())
.append("["); .append("[");
for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) { for(Map.Entry<String, String> host: mapHost.entrySet()) {
b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue())); DatanodeDescriptor[] e = map.get(host.getValue());
b.append("\n " + host.getKey() + " => "+host.getValue() + " => "
+ Arrays.asList(e));
} }
return b.append("\n]").toString(); return b.append("\n]").toString();
} }

View File

@ -172,7 +172,9 @@ public class NamenodeFsck {
this.minReplication = minReplication; this.minReplication = minReplication;
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,
networktopology); networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap());
for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next(); String key = it.next();

View File

@ -918,28 +918,46 @@ public class DFSTestUtil {
rackLocation); rackLocation);
} }
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, String hostname) {
return getDatanodeDescriptor(ipAddr,
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
}
public static DatanodeStorageInfo createDatanodeStorageInfo( public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip) { String storageID, String ip) {
return createDatanodeStorageInfo(storageID, ip, "defaultRack"); return createDatanodeStorageInfo(storageID, ip, "defaultRack", "host");
} }
public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) { public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) {
return createDatanodeStorageInfos(racks.length, racks); return createDatanodeStorageInfos(racks, null);
} }
public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, String... racks) {
public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks, String[] hostnames) {
return createDatanodeStorageInfos(racks.length, racks, hostnames);
}
public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) {
return createDatanodeStorageInfos(n, null, null);
}
public static DatanodeStorageInfo[] createDatanodeStorageInfos(
int n, String[] racks, String[] hostnames) {
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n]; DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
for(int i = storages.length; i > 0; ) { for(int i = storages.length; i > 0; ) {
final String storageID = "s" + i; final String storageID = "s" + i;
final String ip = i + "." + i + "." + i + "." + i; final String ip = i + "." + i + "." + i + "." + i;
i--; i--;
final String rack = i < racks.length? racks[i]: "defaultRack"; final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack";
storages[i] = createDatanodeStorageInfo(storageID, ip, rack); final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host";
storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname);
} }
return storages; return storages;
} }
public static DatanodeStorageInfo createDatanodeStorageInfo( public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip, String rack) { String storageID, String ip, String rack, String hostname) {
final DatanodeStorage storage = new DatanodeStorage(storageID); final DatanodeStorage storage = new DatanodeStorage(storageID);
final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage); final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname);
return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
} }
public static DatanodeDescriptor[] toDatanodeDescriptor( public static DatanodeDescriptor[] toDatanodeDescriptor(
@ -952,8 +970,8 @@ public class DFSTestUtil {
} }
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
int port, String rackLocation) { int port, String rackLocation, String hostname) {
DatanodeID dnId = new DatanodeID(ipAddr, "host", DatanodeID dnId = new DatanodeID(ipAddr, hostname,
UUID.randomUUID().toString(), port, UUID.randomUUID().toString(), port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
@ -961,6 +979,11 @@ public class DFSTestUtil {
return new DatanodeDescriptor(dnId, rackLocation); return new DatanodeDescriptor(dnId, rackLocation);
} }
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
int port, String rackLocation) {
return getDatanodeDescriptor(ipAddr, port, rackLocation, "host");
}
public static DatanodeRegistration getLocalDatanodeRegistration() { public static DatanodeRegistration getLocalDatanodeRegistration() {
return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo( return new DatanodeRegistration(getLocalDatanodeID(), new StorageInfo(
NodeType.DATA_NODE), new ExportedBlockKeys(), VersionInfo.getVersion()); NodeType.DATA_NODE), new ExportedBlockKeys(), VersionInfo.getVersion());

View File

@ -236,8 +236,13 @@ public class BlockManagerTestUtil {
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, DatanodeStorage storage) { String rackLocation, DatanodeStorage storage) {
return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host");
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, DatanodeStorage storage, String hostname) {
DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr, DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation); DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
if (storage != null) { if (storage != null) {
dn.updateStorage(storage); dn.updateStorage(storage);
} }

View File

@ -47,11 +47,13 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestReplicationPolicyWithNodeGroup { public class TestReplicationPolicyWithNodeGroup {
private static final int BLOCK_SIZE = 1024; private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 8; private static final int NUM_OF_DATANODES = 8;
private static final int NUM_OF_DATANODES_BOUNDARY = 6; private static final int NUM_OF_DATANODES_BOUNDARY = 6;
private static final int NUM_OF_DATANODES_MORE_TARGETS = 12; private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6;
private final Configuration CONF = new HdfsConfiguration(); private final Configuration CONF = new HdfsConfiguration();
private NetworkTopology cluster; private NetworkTopology cluster;
private NameNode namenode; private NameNode namenode;
@ -114,6 +116,32 @@ public class TestReplicationPolicyWithNodeGroup {
private final static DatanodeDescriptor NODE = private final static DatanodeDescriptor NODE =
new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7")); new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
private static final DatanodeStorageInfo[] storagesForDependencies;
private static final DatanodeDescriptor[] dataNodesForDependencies;
static {
final String[] racksForDependencies = {
"/d1/r1/n1",
"/d1/r1/n1",
"/d1/r1/n2",
"/d1/r1/n2",
"/d1/r1/n3",
"/d1/r1/n4"
};
final String[] hostNamesForDependencies = {
"h1",
"h2",
"h3",
"h4",
"h5",
"h6"
};
storagesForDependencies = DFSTestUtil.createDatanodeStorageInfos(
racksForDependencies, hostNamesForDependencies);
dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies);
};
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
@ -720,5 +748,63 @@ public class TestReplicationPolicyWithNodeGroup {
assertEquals(targets.length, 6); assertEquals(targets.length, 6);
} }
@Test
public void testChooseTargetWithDependencies() throws Exception {
for(int i=0; i<NUM_OF_DATANODES; i++) {
cluster.remove(dataNodes[i]);
}
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
DatanodeDescriptor node = dataNodesInMoreTargetsCase[i];
if (cluster.contains(node)) {
cluster.remove(node);
}
}
Host2NodesMap host2DatanodeMap = namenode.getNamesystem()
.getBlockManager()
.getDatanodeManager().getHost2DatanodeMap();
for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
cluster.add(dataNodesForDependencies[i]);
host2DatanodeMap.add(dataNodesForDependencies[i]);
}
//add dependencies (node1 <-> node2, and node3<->node4)
dataNodesForDependencies[1].addDependentHostName(
dataNodesForDependencies[2].getHostName());
dataNodesForDependencies[2].addDependentHostName(
dataNodesForDependencies[1].getHostName());
dataNodesForDependencies[3].addDependentHostName(
dataNodesForDependencies[4].getHostName());
dataNodesForDependencies[4].addDependentHostName(
dataNodesForDependencies[3].getHostName());
//Update heartbeat
for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
updateHeartbeatWithUsage(dataNodesForDependencies[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
DatanodeStorageInfo[] targets;
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodesForDependencies[5]);
//try to select three targets as there are three node groups
targets = chooseTarget(3, dataNodesForDependencies[1], chosenNodes, excludedNodes);
//Even there are three node groups, verify that
//only two targets are selected due to dependencies
assertEquals(targets.length, 2);
assertEquals(targets[0], storagesForDependencies[1]);
assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4]));
//verify that all data nodes are in the excluded list
assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES);
for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
assertTrue(excludedNodes.contains(dataNodesForDependencies[i]));
}
}
} }

View File

@ -68,6 +68,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result; import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.hdfs.tools.DFSck;
@ -981,10 +983,15 @@ public class TestFsck {
PrintWriter out = new PrintWriter(result, true); PrintWriter out = new PrintWriter(result, true);
InetAddress remoteAddress = InetAddress.getLocalHost(); InetAddress remoteAddress = InetAddress.getLocalHost();
FSNamesystem fsName = mock(FSNamesystem.class); FSNamesystem fsName = mock(FSNamesystem.class);
BlockManager blockManager = mock(BlockManager.class);
DatanodeManager dnManager = mock(DatanodeManager.class);
when(namenode.getNamesystem()).thenReturn(fsName); when(namenode.getNamesystem()).thenReturn(fsName);
when(fsName.getBlockLocations(anyString(), anyLong(), anyLong(), when(fsName.getBlockLocations(anyString(), anyLong(), anyLong(),
anyBoolean(), anyBoolean(), anyBoolean())). anyBoolean(), anyBoolean(), anyBoolean())).
thenThrow(new FileNotFoundException()) ; thenThrow(new FileNotFoundException()) ;
when(fsName.getBlockManager()).thenReturn(blockManager);
when(blockManager.getDatanodeManager()).thenReturn(dnManager);
NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out, NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
NUM_REPLICAS, (short)1, remoteAddress); NUM_REPLICAS, (short)1, remoteAddress);