HDFS-10208. Addendum for HDFS-9579: to handle the case when client machine can't resolve network path (Ming Ma via sjlee)

This commit is contained in:
Sangjin Lee 2016-05-16 18:49:47 -07:00
parent 730bc746f9
commit 61f46be071
8 changed files with 173 additions and 35 deletions

View File

@ -111,7 +111,12 @@ public class CommonConfigurationKeysPublic {
public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval";
/** Default value for FS_TRASH_INTERVAL_KEY */
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
public static final String FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED =
"fs.client.resolve.topology.enabled";
/** Default value for FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED. */
public static final boolean FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT =
false;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String IO_MAPFILE_BLOOM_SIZE_KEY =
"io.mapfile.bloom.size";

View File

@ -655,6 +655,41 @@ public class NetworkTopology {
return dis+2;
}
/** Return the distance between two nodes by comparing their network paths
* without checking if they belong to the same ancestor node by reference.
* It is assumed that the distance from one node to its parent is 1
* The distance between two nodes is calculated by summing up their distances
* to their closest common ancestor.
* @param node1 one node
* @param node2 another node
* @return the distance between node1 and node2
*/
static public int getDistanceByPath(Node node1, Node node2) {
if (node1 == null && node2 == null) {
return 0;
}
if (node1 == null || node2 == null) {
LOG.warn("One of the nodes is a null pointer");
return Integer.MAX_VALUE;
}
String[] paths1 = NodeBase.getPathComponents(node1);
String[] paths2 = NodeBase.getPathComponents(node2);
int dis = 0;
int index = 0;
int minLevel = Math.min(paths1.length, paths2.length);
while (index < minLevel) {
if (!paths1[index].equals(paths2[index])) {
// Once the path starts to diverge, compute the distance that include
// the rest of paths.
dis += 2 * (minLevel - index);
break;
}
index++;
}
dis += Math.abs(paths1.length - paths2.length);
return dis;
}
/** Check if two nodes are on the same rack
* @param node1 one node (can be null)
* @param node2 another node (can be null)

View File

@ -113,6 +113,15 @@ public class NodeBase implements Node {
return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
}
/**
* Get the path components of a node.
* @param node a non-null node
* @return the path of a node
*/
public static String[] getPathComponents(Node node) {
return getPath(node).split(PATH_SEPARATOR_STR);
}
@Override
public boolean equals(Object to) {
if (this == to) {

View File

@ -2143,4 +2143,17 @@
<name>hadoop.http.logs.enabled</name>
<value>true</value>
</property>
<property>
<name>fs.client.resolve.topology.enabled</name>
<value>false</value>
<description>Whether the client machine will use the class specified by
property net.topology.node.switch.mapping.impl to compute the network
distance between itself and remote machines of the FileSystem. Additional
properties might need to be configured depending on the class specified
in net.topology.node.switch.mapping.impl. For example, if
org.apache.hadoop.net.ScriptBasedMapping is used, a valid script file
needs to be specified in net.topology.script.file.name.
</description>
</property>
</configuration>

View File

@ -17,11 +17,12 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -113,9 +114,8 @@ public class ClientContext {
*/
private boolean printedConfWarning = false;
private final NetworkTopology topology;
private final NodeBase clientNode;
private final Map<NodeBase, Integer> nodeToDistance;
private NodeBase clientNode;
private boolean topologyResolutionEnabled;
private ClientContext(String name, DfsClientConf conf,
Configuration config) {
@ -133,19 +133,29 @@ public class ClientContext {
this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
initTopologyResolution(config);
}
private void initTopologyResolution(Configuration config) {
topologyResolutionEnabled = config.getBoolean(
FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED,
FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT);
if (!topologyResolutionEnabled) {
return;
}
DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance(
config.getClass(
CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), config);
List<String> nodes = new ArrayList<>();
String clientHostName = NetUtils.getLocalHostname();
List<String> nodes = new ArrayList<>();
nodes.add(clientHostName);
clientNode = new NodeBase(clientHostName,
dnsToSwitchMapping.resolve(nodes).get(0));
this.topology = NetworkTopology.getInstance(config);
this.topology.add(clientNode);
this.nodeToDistance = new ConcurrentHashMap<>();
List<String> resolvedHosts = dnsToSwitchMapping.resolve(nodes);
if (resolvedHosts != null && !resolvedHosts.isEmpty() &&
!resolvedHosts.get(0).equals(NetworkTopology.DEFAULT_RACK)) {
// The client machine is able to resolve its own network location.
this.clientNode = new NodeBase(clientHostName, resolvedHosts.get(0));
}
}
public static ClientContext get(String name, DfsClientConf conf,
@ -229,14 +239,15 @@ public class ClientContext {
}
public int getNetworkDistance(DatanodeInfo datanodeInfo) {
// If applications disable the feature or the client machine can't
// resolve its network location, clientNode will be set to null.
if (clientNode == null) {
return DFSUtilClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeInfo.getXferAddr())) ? 0 :
Integer.MAX_VALUE;
}
NodeBase node = new NodeBase(datanodeInfo.getHostName(),
datanodeInfo.getNetworkLocation());
Integer distance = nodeToDistance.get(node);
if (distance == null) {
topology.add(node);
distance = topology.getDistance(clientNode, node);
nodeToDistance.put(node, distance);
}
return distance;
return NetworkTopology.getDistanceByPath(clientNode, node);
}
}

View File

@ -189,7 +189,8 @@ public class MiniDFSCluster {
private boolean checkDataNodeHostConfig = false;
private Configuration[] dnConfOverlays;
private boolean skipFsyncForTesting = true;
private boolean useConfiguredTopologyMappingClass = false;
public Builder(Configuration conf) {
this.conf = conf;
this.storagesPerDatanode =
@ -433,7 +434,14 @@ public class MiniDFSCluster {
this.skipFsyncForTesting = val;
return this;
}
public Builder useConfiguredTopologyMappingClass(
boolean useConfiguredTopologyMappingClass) {
this.useConfiguredTopologyMappingClass =
useConfiguredTopologyMappingClass;
return this;
}
/**
* Construct the actual MiniDFSCluster
*/
@ -501,7 +509,8 @@ public class MiniDFSCluster {
builder.checkDataNodeAddrConfig,
builder.checkDataNodeHostConfig,
builder.dnConfOverlays,
builder.skipFsyncForTesting);
builder.skipFsyncForTesting,
builder.useConfiguredTopologyMappingClass);
}
public class DataNodeProperties {
@ -756,12 +765,13 @@ public class MiniDFSCluster {
operation, null, racks, hosts,
null, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0),
true, false, false, null, true);
true, false, false, null, true, false);
}
private void initMiniDFSCluster(
Configuration conf,
int numDataNodes, StorageType[][] storageTypes, boolean format, boolean manageNameDfsDirs,
int numDataNodes, StorageType[][] storageTypes, boolean format,
boolean manageNameDfsDirs,
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs, StartupOption startOpt,
StartupOption dnStartOpt, String[] racks,
@ -772,7 +782,8 @@ public class MiniDFSCluster {
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays,
boolean skipFsyncForTesting)
boolean skipFsyncForTesting,
boolean useConfiguredTopologyMappingClass)
throws IOException {
boolean success = false;
try {
@ -797,9 +808,11 @@ public class MiniDFSCluster {
DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 0);
conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, safemodeExtension);
conf.setInt(DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
if (!useConfiguredTopologyMappingClass) {
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
}
// In an HA cluster, in order for the StandbyNode to perform checkpoints,
// it needs to know the HTTP port of the Active. So, if ephemeral ports
// are chosen, disable checkpoints for the test.

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -80,7 +82,10 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
@ -798,39 +803,77 @@ public class TestDistributedFileSystem {
@Test
public void testLocalHostReadStatistics() throws Exception {
testReadFileSystemStatistics(0);
testReadFileSystemStatistics(0, false, false);
}
@Test
public void testLocalRackReadStatistics() throws Exception {
testReadFileSystemStatistics(2);
testReadFileSystemStatistics(2, false, false);
}
@Test
public void testRemoteRackOfFirstDegreeReadStatistics() throws Exception {
testReadFileSystemStatistics(4);
testReadFileSystemStatistics(4, false, false);
}
@Test
public void testInvalidScriptMappingFileReadStatistics() throws Exception {
// Even though network location of the client machine is unknown,
// MiniDFSCluster's datanode is on the local host and thus the network
// distance is 0.
testReadFileSystemStatistics(0, true, true);
}
@Test
public void testEmptyScriptMappingFileReadStatistics() throws Exception {
// Network location of the client machine is resolved to
// {@link NetworkTopology#DEFAULT_RACK} when there is no script file
// defined. This is equivalent to unknown network location.
// MiniDFSCluster's datanode is on the local host and thus the network
// distance is 0.
testReadFileSystemStatistics(0, true, false);
}
/** expectedDistance is the expected distance between client and dn.
* 0 means local host.
* 2 means same rack.
* 4 means remote rack of first degree.
* invalidScriptMappingConfig is used to test
*/
private void testReadFileSystemStatistics(int expectedDistance)
private void testReadFileSystemStatistics(int expectedDistance,
boolean useScriptMapping, boolean invalidScriptMappingFile)
throws IOException {
MiniDFSCluster cluster = null;
StaticMapping.addNodeToRack(NetUtils.getLocalHostname(), "/rackClient");
final Configuration conf = getTestConfiguration();
conf.setBoolean(FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED, true);
// ClientContext is cached globally by default thus we will end up using
// the network distance computed by other test cases.
// Use different value for DFS_CLIENT_CONTEXT in each test case so that it
// can compute network distance independently.
conf.set(DFS_CLIENT_CONTEXT, "testContext_" + expectedDistance);
// create a cluster with a dn with the expected distance.
if (expectedDistance == 0) {
// MiniDFSCluster by default uses StaticMapping unless the test
// overrides it.
if (useScriptMapping) {
conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class);
if (invalidScriptMappingFile) {
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
"invalidScriptFile.txt");
}
cluster = new MiniDFSCluster.Builder(conf).
useConfiguredTopologyMappingClass(true).build();
} else if (expectedDistance == 0) {
cluster = new MiniDFSCluster.Builder(conf).
hosts(new String[] {NetUtils.getLocalHostname()}).build();
} else if (expectedDistance == 2) {
cluster = new MiniDFSCluster.Builder(conf).
hosts(new String[] {"hostFoo"}).build();
racks(new String[]{"/rackClient"}).build();
} else if (expectedDistance == 4) {
cluster = new MiniDFSCluster.Builder(conf).
racks(new String[] {"/rackFoo"}).build();
racks(new String[]{"/rackFoo"}).build();
}
// create a file, read the file and verify the metrics

View File

@ -144,6 +144,15 @@ public class TestNetworkTopology {
NodeBase node2 = new NodeBase(dataNodes[0].getHostName(),
dataNodes[0].getNetworkLocation());
assertEquals(0, cluster.getDistance(node1, node2));
// verify the distance can be computed by path.
// They don't need to refer to the same object or parents.
NodeBase node3 = new NodeBase(dataNodes[3].getHostName(),
dataNodes[3].getNetworkLocation());
NodeBase node4 = new NodeBase(dataNodes[6].getHostName(),
dataNodes[6].getNetworkLocation());
assertEquals(0, NetworkTopology.getDistanceByPath(node1, node2));
assertEquals(4, NetworkTopology.getDistanceByPath(node2, node3));
assertEquals(6, NetworkTopology.getDistanceByPath(node2, node4));
}
@Test