HDFS-10208. Addendum for HDFS-9579: to handle the case when client machine can't resolve network path (Ming Ma via sjlee)

(cherry picked from commit 61f46be071)
This commit is contained in:
Sangjin Lee 2016-05-16 18:49:47 -07:00
parent 09a613b023
commit 9330a7b4de
8 changed files with 174 additions and 36 deletions

View File

@ -116,7 +116,12 @@ public class CommonConfigurationKeysPublic {
public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval"; public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval";
/** Default value for FS_TRASH_INTERVAL_KEY */ /** Default value for FS_TRASH_INTERVAL_KEY */
public static final long FS_TRASH_INTERVAL_DEFAULT = 0; public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
public static final String FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED =
"fs.client.resolve.topology.enabled";
/** Default value for FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED. */
public static final boolean FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT =
false;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String IO_MAPFILE_BLOOM_SIZE_KEY = public static final String IO_MAPFILE_BLOOM_SIZE_KEY =
"io.mapfile.bloom.size"; "io.mapfile.bloom.size";

View File

@ -655,6 +655,41 @@ public class NetworkTopology {
return dis+2; return dis+2;
} }
/** Return the distance between two nodes by comparing their network paths
* without checking if they belong to the same ancestor node by reference.
* It is assumed that the distance from one node to its parent is 1
* The distance between two nodes is calculated by summing up their distances
* to their closest common ancestor.
* @param node1 one node
* @param node2 another node
* @return the distance between node1 and node2
*/
static public int getDistanceByPath(Node node1, Node node2) {
if (node1 == null && node2 == null) {
return 0;
}
if (node1 == null || node2 == null) {
LOG.warn("One of the nodes is a null pointer");
return Integer.MAX_VALUE;
}
String[] paths1 = NodeBase.getPathComponents(node1);
String[] paths2 = NodeBase.getPathComponents(node2);
int dis = 0;
int index = 0;
int minLevel = Math.min(paths1.length, paths2.length);
while (index < minLevel) {
if (!paths1[index].equals(paths2[index])) {
// Once the path starts to diverge, compute the distance that include
// the rest of paths.
dis += 2 * (minLevel - index);
break;
}
index++;
}
dis += Math.abs(paths1.length - paths2.length);
return dis;
}
/** Check if two nodes are on the same rack /** Check if two nodes are on the same rack
* @param node1 one node (can be null) * @param node1 one node (can be null)
* @param node2 another node (can be null) * @param node2 another node (can be null)

View File

@ -113,6 +113,15 @@ public class NodeBase implements Node {
return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName(); return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
} }
/**
* Get the path components of a node.
* @param node a non-null node
* @return the path of a node
*/
public static String[] getPathComponents(Node node) {
return getPath(node).split(PATH_SEPARATOR_STR);
}
@Override @Override
public boolean equals(Object to) { public boolean equals(Object to) {
if (this == to) { if (this == to) {

View File

@ -2143,4 +2143,17 @@
<name>hadoop.http.logs.enabled</name> <name>hadoop.http.logs.enabled</name>
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>fs.client.resolve.topology.enabled</name>
<value>false</value>
<description>Whether the client machine will use the class specified by
property net.topology.node.switch.mapping.impl to compute the network
distance between itself and remote machines of the FileSystem. Additional
properties might need to be configured depending on the class specified
in net.topology.node.switch.mapping.impl. For example, if
org.apache.hadoop.net.ScriptBasedMapping is used, a valid script file
needs to be specified in net.topology.script.file.name.
</description>
</property>
</configuration> </configuration>

View File

@ -17,11 +17,12 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -113,9 +114,8 @@ public class ClientContext {
*/ */
private boolean printedConfWarning = false; private boolean printedConfWarning = false;
private final NetworkTopology topology; private NodeBase clientNode;
private final NodeBase clientNode; private boolean topologyResolutionEnabled;
private final Map<NodeBase, Integer> nodeToDistance;
private ClientContext(String name, DfsClientConf conf, private ClientContext(String name, DfsClientConf conf,
Configuration config) { Configuration config) {
@ -133,19 +133,29 @@ public class ClientContext {
this.byteArrayManager = ByteArrayManager.newInstance( this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf()); conf.getWriteByteArrayManagerConf());
initTopologyResolution(config);
}
private void initTopologyResolution(Configuration config) {
topologyResolutionEnabled = config.getBoolean(
FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED,
FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT);
if (!topologyResolutionEnabled) {
return;
}
DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance( DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance(
config.getClass( config.getClass(
CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), config); ScriptBasedMapping.class, DNSToSwitchMapping.class), config);
List<String> nodes = new ArrayList<>();
String clientHostName = NetUtils.getLocalHostname(); String clientHostName = NetUtils.getLocalHostname();
List<String> nodes = new ArrayList<>();
nodes.add(clientHostName); nodes.add(clientHostName);
clientNode = new NodeBase(clientHostName, List<String> resolvedHosts = dnsToSwitchMapping.resolve(nodes);
dnsToSwitchMapping.resolve(nodes).get(0)); if (resolvedHosts != null && !resolvedHosts.isEmpty() &&
this.topology = NetworkTopology.getInstance(config); !resolvedHosts.get(0).equals(NetworkTopology.DEFAULT_RACK)) {
this.topology.add(clientNode); // The client machine is able to resolve its own network location.
this.nodeToDistance = new ConcurrentHashMap<>(); this.clientNode = new NodeBase(clientHostName, resolvedHosts.get(0));
}
} }
public static ClientContext get(String name, DfsClientConf conf, public static ClientContext get(String name, DfsClientConf conf,
@ -229,14 +239,15 @@ public class ClientContext {
} }
public int getNetworkDistance(DatanodeInfo datanodeInfo) { public int getNetworkDistance(DatanodeInfo datanodeInfo) {
// If applications disable the feature or the client machine can't
// resolve its network location, clientNode will be set to null.
if (clientNode == null) {
return DFSUtilClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeInfo.getXferAddr())) ? 0 :
Integer.MAX_VALUE;
}
NodeBase node = new NodeBase(datanodeInfo.getHostName(), NodeBase node = new NodeBase(datanodeInfo.getHostName(),
datanodeInfo.getNetworkLocation()); datanodeInfo.getNetworkLocation());
Integer distance = nodeToDistance.get(node); return NetworkTopology.getDistanceByPath(clientNode, node);
if (distance == null) {
topology.add(node);
distance = topology.getDistance(clientNode, node);
nodeToDistance.put(node, distance);
}
return distance;
} }
} }

View File

@ -189,6 +189,7 @@ public class MiniDFSCluster {
private boolean checkDataNodeHostConfig = false; private boolean checkDataNodeHostConfig = false;
private Configuration[] dnConfOverlays; private Configuration[] dnConfOverlays;
private boolean skipFsyncForTesting = true; private boolean skipFsyncForTesting = true;
private boolean useConfiguredTopologyMappingClass = false;
public Builder(Configuration conf) { public Builder(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -434,6 +435,13 @@ public class MiniDFSCluster {
return this; return this;
} }
public Builder useConfiguredTopologyMappingClass(
boolean useConfiguredTopologyMappingClass) {
this.useConfiguredTopologyMappingClass =
useConfiguredTopologyMappingClass;
return this;
}
/** /**
* Construct the actual MiniDFSCluster * Construct the actual MiniDFSCluster
*/ */
@ -501,7 +509,8 @@ public class MiniDFSCluster {
builder.checkDataNodeAddrConfig, builder.checkDataNodeAddrConfig,
builder.checkDataNodeHostConfig, builder.checkDataNodeHostConfig,
builder.dnConfOverlays, builder.dnConfOverlays,
builder.skipFsyncForTesting); builder.skipFsyncForTesting,
builder.useConfiguredTopologyMappingClass);
} }
public class DataNodeProperties { public class DataNodeProperties {
@ -756,12 +765,13 @@ public class MiniDFSCluster {
operation, null, racks, hosts, operation, null, racks, hosts,
null, simulatedCapacities, null, true, false, null, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0),
true, false, false, null, true); true, false, false, null, true, false);
} }
private void initMiniDFSCluster( private void initMiniDFSCluster(
Configuration conf, Configuration conf,
int numDataNodes, StorageType[][] storageTypes, boolean format, boolean manageNameDfsDirs, int numDataNodes, StorageType[][] storageTypes, boolean format,
boolean manageNameDfsDirs,
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy, boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs, StartupOption startOpt, boolean manageDataDfsDirs, StartupOption startOpt,
StartupOption dnStartOpt, String[] racks, StartupOption dnStartOpt, String[] racks,
@ -772,7 +782,8 @@ public class MiniDFSCluster {
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig, boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays, Configuration[] dnConfOverlays,
boolean skipFsyncForTesting) boolean skipFsyncForTesting,
boolean useConfiguredTopologyMappingClass)
throws IOException { throws IOException {
boolean success = false; boolean success = false;
try { try {
@ -797,8 +808,10 @@ public class MiniDFSCluster {
DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 0); DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 0);
conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, safemodeExtension); conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, safemodeExtension);
conf.setInt(DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second conf.setInt(DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second
if (!useConfiguredTopologyMappingClass) {
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class); StaticMapping.class, DNSToSwitchMapping.class);
}
// In an HA cluster, in order for the StandbyNode to perform checkpoints, // In an HA cluster, in order for the StandbyNode to perform checkpoints,
// it needs to know the HTTP port of the Active. So, if ephemeral ports // it needs to know the HTTP port of the Active. So, if ephemeral ports

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -88,8 +90,11 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -661,39 +666,77 @@ public class TestDistributedFileSystem {
@Test @Test
public void testLocalHostReadStatistics() throws Exception { public void testLocalHostReadStatistics() throws Exception {
testReadFileSystemStatistics(0); testReadFileSystemStatistics(0, false, false);
} }
@Test @Test
public void testLocalRackReadStatistics() throws Exception { public void testLocalRackReadStatistics() throws Exception {
testReadFileSystemStatistics(2); testReadFileSystemStatistics(2, false, false);
} }
@Test @Test
public void testRemoteRackOfFirstDegreeReadStatistics() throws Exception { public void testRemoteRackOfFirstDegreeReadStatistics() throws Exception {
testReadFileSystemStatistics(4); testReadFileSystemStatistics(4, false, false);
}
@Test
public void testInvalidScriptMappingFileReadStatistics() throws Exception {
// Even though network location of the client machine is unknown,
// MiniDFSCluster's datanode is on the local host and thus the network
// distance is 0.
testReadFileSystemStatistics(0, true, true);
}
@Test
public void testEmptyScriptMappingFileReadStatistics() throws Exception {
// Network location of the client machine is resolved to
// {@link NetworkTopology#DEFAULT_RACK} when there is no script file
// defined. This is equivalent to unknown network location.
// MiniDFSCluster's datanode is on the local host and thus the network
// distance is 0.
testReadFileSystemStatistics(0, true, false);
} }
/** expectedDistance is the expected distance between client and dn. /** expectedDistance is the expected distance between client and dn.
* 0 means local host. * 0 means local host.
* 2 means same rack. * 2 means same rack.
* 4 means remote rack of first degree. * 4 means remote rack of first degree.
* invalidScriptMappingConfig is used to test
*/ */
private void testReadFileSystemStatistics(int expectedDistance) private void testReadFileSystemStatistics(int expectedDistance,
boolean useScriptMapping, boolean invalidScriptMappingFile)
throws IOException { throws IOException {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
StaticMapping.addNodeToRack(NetUtils.getLocalHostname(), "/rackClient");
final Configuration conf = getTestConfiguration(); final Configuration conf = getTestConfiguration();
conf.setBoolean(FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED, true);
// ClientContext is cached globally by default thus we will end up using
// the network distance computed by other test cases.
// Use different value for DFS_CLIENT_CONTEXT in each test case so that it
// can compute network distance independently.
conf.set(DFS_CLIENT_CONTEXT, "testContext_" + expectedDistance);
// create a cluster with a dn with the expected distance. // create a cluster with a dn with the expected distance.
if (expectedDistance == 0) { // MiniDFSCluster by default uses StaticMapping unless the test
// overrides it.
if (useScriptMapping) {
conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class);
if (invalidScriptMappingFile) {
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
"invalidScriptFile.txt");
}
cluster = new MiniDFSCluster.Builder(conf).
useConfiguredTopologyMappingClass(true).build();
} else if (expectedDistance == 0) {
cluster = new MiniDFSCluster.Builder(conf). cluster = new MiniDFSCluster.Builder(conf).
hosts(new String[] {NetUtils.getLocalHostname()}).build(); hosts(new String[] {NetUtils.getLocalHostname()}).build();
} else if (expectedDistance == 2) { } else if (expectedDistance == 2) {
cluster = new MiniDFSCluster.Builder(conf). cluster = new MiniDFSCluster.Builder(conf).
hosts(new String[] {"hostFoo"}).build(); racks(new String[]{"/rackClient"}).build();
} else if (expectedDistance == 4) { } else if (expectedDistance == 4) {
cluster = new MiniDFSCluster.Builder(conf). cluster = new MiniDFSCluster.Builder(conf).
racks(new String[] {"/rackFoo"}).build(); racks(new String[]{"/rackFoo"}).build();
} }
// create a file, read the file and verify the metrics // create a file, read the file and verify the metrics

View File

@ -144,6 +144,15 @@ public class TestNetworkTopology {
NodeBase node2 = new NodeBase(dataNodes[0].getHostName(), NodeBase node2 = new NodeBase(dataNodes[0].getHostName(),
dataNodes[0].getNetworkLocation()); dataNodes[0].getNetworkLocation());
assertEquals(0, cluster.getDistance(node1, node2)); assertEquals(0, cluster.getDistance(node1, node2));
// verify the distance can be computed by path.
// They don't need to refer to the same object or parents.
NodeBase node3 = new NodeBase(dataNodes[3].getHostName(),
dataNodes[3].getNetworkLocation());
NodeBase node4 = new NodeBase(dataNodes[6].getHostName(),
dataNodes[6].getNetworkLocation());
assertEquals(0, NetworkTopology.getDistanceByPath(node1, node2));
assertEquals(4, NetworkTopology.getDistanceByPath(node2, node3));
assertEquals(6, NetworkTopology.getDistanceByPath(node2, node4));
} }
@Test @Test