HDFS-5846. Merging change r1581091 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581092 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-03-24 22:22:33 +00:00
parent 069e6b389e
commit 02b4b702d3
6 changed files with 188 additions and 18 deletions

View File

@ -441,6 +441,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6135. In HDFS upgrade with HA setup, JournalNode cannot handle layout HDFS-6135. In HDFS upgrade with HA setup, JournalNode cannot handle layout
version bump when rolling back. (jing9) version bump when rolling back. (jing9)
HDFS-5846. Assigning DEFAULT_RACK in resolveNetworkLocation method can break
data resiliency. (Nikola Vujic via cnauroth)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -612,7 +612,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500; public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500;
public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis"; public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000; public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000;
// Handling unresolved DN topology mapping
public static final String DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY =
"dfs.namenode.reject-unresolved-dn-topology-mapping";
public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
false;
// hedged read properties // hedged read properties
public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS = public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
"dfs.client.hedged.read.threshold.millis"; "dfs.client.hedged.read.threshold.millis";

View File

@ -98,6 +98,7 @@ public class DatanodeManager {
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap(); private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
private final DNSToSwitchMapping dnsToSwitchMapping; private final DNSToSwitchMapping dnsToSwitchMapping;
private final boolean rejectUnresolvedTopologyDN;
private final int defaultXferPort; private final int defaultXferPort;
@ -201,6 +202,10 @@ public class DatanodeManager {
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
this.rejectUnresolvedTopologyDN = conf.getBoolean(
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY,
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT);
// If the dns to switch mapping supports cache, resolve network // If the dns to switch mapping supports cache, resolve network
// locations of those hosts in the include list and store the mapping // locations of those hosts in the include list and store the mapping
// in the cache; so future calls to resolve will be fast. // in the cache; so future calls to resolve will be fast.
@ -391,7 +396,8 @@ DatanodeDescriptor getDatanodeDescriptor(String address) {
node = getDatanodeByHost(host); node = getDatanodeByHost(host);
} }
if (node == null) { if (node == null) {
String networkLocation = resolveNetworkLocation(dnId); String networkLocation =
resolveNetworkLocationWithFallBackToDefaultLocation(dnId);
// If the current cluster doesn't contain the node, fallback to // If the current cluster doesn't contain the node, fallback to
// something machine local and then rack local. // something machine local and then rack local.
@ -626,9 +632,36 @@ public HashMap<String, Integer> getDatanodesSoftwareVersions() {
return new HashMap<String, Integer> (this.datanodesSoftwareVersions); return new HashMap<String, Integer> (this.datanodesSoftwareVersions);
} }
} }
/* Resolve a node's network location */ /**
private String resolveNetworkLocation (DatanodeID node) { * Resolve a node's network location. If the DNS to switch mapping fails
* then this method guarantees default rack location.
* @param node to resolve to network location
* @return network location path
*/
private String resolveNetworkLocationWithFallBackToDefaultLocation (
DatanodeID node) {
String networkLocation;
try {
networkLocation = resolveNetworkLocation(node);
} catch (UnresolvedTopologyException e) {
LOG.error("Unresolved topology mapping. Using " +
NetworkTopology.DEFAULT_RACK + " for host " + node.getHostName());
networkLocation = NetworkTopology.DEFAULT_RACK;
}
return networkLocation;
}
/**
* Resolve a node's network location. If the DNS to switch mapping fails,
* then this method throws UnresolvedTopologyException.
* @param node to resolve to network location
* @return network location path.
* @throws UnresolvedTopologyException if the DNS to switch mapping fails
* to resolve network location.
*/
private String resolveNetworkLocation (DatanodeID node)
throws UnresolvedTopologyException {
List<String> names = new ArrayList<String>(1); List<String> names = new ArrayList<String>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
names.add(node.getIpAddr()); names.add(node.getIpAddr());
@ -640,9 +673,9 @@ private String resolveNetworkLocation (DatanodeID node) {
List<String> rName = dnsToSwitchMapping.resolve(names); List<String> rName = dnsToSwitchMapping.resolve(names);
String networkLocation; String networkLocation;
if (rName == null) { if (rName == null) {
LOG.error("The resolve call returned null! Using " + LOG.error("The resolve call returned null!");
NetworkTopology.DEFAULT_RACK + " for host " + names); throw new UnresolvedTopologyException(
networkLocation = NetworkTopology.DEFAULT_RACK; "Unresolved topology mapping for host " + node.getHostName());
} else { } else {
networkLocation = rName.get(0); networkLocation = rName.get(0);
} }
@ -754,9 +787,11 @@ void stopDecommission(DatanodeDescriptor node) {
* @param nodeReg the datanode registration * @param nodeReg the datanode registration
* @throws DisallowedDatanodeException if the registration request is * @throws DisallowedDatanodeException if the registration request is
* denied because the datanode does not match includes/excludes * denied because the datanode does not match includes/excludes
* @throws UnresolvedTopologyException if the registration request is
* denied because resolving datanode network location fails.
*/ */
public void registerDatanode(DatanodeRegistration nodeReg) public void registerDatanode(DatanodeRegistration nodeReg)
throws DisallowedDatanodeException { throws DisallowedDatanodeException, UnresolvedTopologyException {
InetAddress dnAddress = Server.getRemoteIp(); InetAddress dnAddress = Server.getRemoteIp();
if (dnAddress != null) { if (dnAddress != null) {
// Mostly called inside an RPC, update ip and peer hostname // Mostly called inside an RPC, update ip and peer hostname
@ -838,7 +873,13 @@ nodes with its data cleared (or user can just remove the StorageID
nodeS.setDisallowed(false); // Node is in the include list nodeS.setDisallowed(false); // Node is in the include list
// resolve network location // resolve network location
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); if(this.rejectUnresolvedTopologyDN)
{
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
} else {
nodeS.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
}
getNetworkTopology().add(nodeS); getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat // also treat the registration message as a heartbeat
@ -860,7 +901,13 @@ nodes with its data cleared (or user can just remove the StorageID
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
boolean success = false; boolean success = false;
try { try {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); // resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
} else {
nodeDescr.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
}
networktopology.add(nodeDescr); networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
/**
* This exception is thrown if resolving topology path
* for a node fails.
*/
public class UnresolvedTopologyException extends IOException {
/** for java.io.Serializable */
private static final long serialVersionUID = 1L;
public UnresolvedTopologyException(String text) {
super(text);
}
}

View File

@ -1841,4 +1841,19 @@
</description> </description>
</property> </property>
</configuration> <property>
<name>dfs.namenode.reject-unresolved-dn-topology-mapping</name>
<value>false</value>
<description>
If the value is set to true, then namenode will reject datanode
registration if the topology mapping for a datanode is not resolved and
NULL is returned (script defined by net.topology.script.file.name fails
to execute). Otherwise, datanode will be registered and the default rack
will be assigned as the topology path. Topology paths are important for
data resiliency, since they define fault domains. Thus it may be unwanted
behavior to allow datanode registration with the default rack if the
resolving topology failed.
</description>
</property>
</configuration>

View File

@ -21,21 +21,29 @@
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mortbay.log.Log;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class TestDatanodeManager { public class TestDatanodeManager {
public static final Log LOG = LogFactory.getLog(TestDatanodeManager.class);
//The number of times the registration / removal of nodes should happen //The number of times the registration / removal of nodes should happen
final int NUM_ITERATIONS = 500; final int NUM_ITERATIONS = 500;
@ -57,7 +65,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
Random rng = new Random(); Random rng = new Random();
int seed = rng.nextInt(); int seed = rng.nextInt();
rng = new Random(seed); rng = new Random(seed);
Log.info("Using seed " + seed + " for testing"); LOG.info("Using seed " + seed + " for testing");
//A map of the Storage IDs to the DN registration it was registered with //A map of the Storage IDs to the DN registration it was registered with
HashMap <String, DatanodeRegistration> sIdToDnReg = HashMap <String, DatanodeRegistration> sIdToDnReg =
@ -76,7 +84,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
it.next(); it.next();
} }
DatanodeRegistration toRemove = it.next().getValue(); DatanodeRegistration toRemove = it.next().getValue();
Log.info("Removing node " + toRemove.getDatanodeUuid() + " ip " + LOG.info("Removing node " + toRemove.getDatanodeUuid() + " ip " +
toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion()); toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion());
//Remove that random node //Remove that random node
@ -110,7 +118,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
Mockito.when(dr.getSoftwareVersion()).thenReturn( Mockito.when(dr.getSoftwareVersion()).thenReturn(
"version" + rng.nextInt(5)); "version" + rng.nextInt(5));
Log.info("Registering node storageID: " + dr.getDatanodeUuid() + LOG.info("Registering node storageID: " + dr.getDatanodeUuid() +
", version: " + dr.getSoftwareVersion() + ", IP address: " ", version: " + dr.getSoftwareVersion() + ", IP address: "
+ dr.getXferAddr()); + dr.getXferAddr());
@ -136,7 +144,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
} }
} }
for(Entry <String, Integer> entry: mapToCheck.entrySet()) { for(Entry <String, Integer> entry: mapToCheck.entrySet()) {
Log.info("Still in map: " + entry.getKey() + " has " LOG.info("Still in map: " + entry.getKey() + " has "
+ entry.getValue()); + entry.getValue());
} }
assertEquals("The map of version counts returned by DatanodeManager was" assertEquals("The map of version counts returned by DatanodeManager was"
@ -144,5 +152,62 @@ public void testNumVersionsReportedCorrect() throws IOException {
mapToCheck.size()); mapToCheck.size());
} }
} }
@Test (timeout = 100000)
public void testRejectUnresolvedDatanodes() throws IOException {
//Create the DatanodeManager which will be tested
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
Configuration conf = new Configuration();
//Set configuration property for rejecting unresolved topology mapping
conf.setBoolean(
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY, true);
//set TestDatanodeManager.MyResolver to be used for topology resolving
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class);
//create DatanodeManager
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, conf);
//storageID to register.
String storageID = "someStorageID-123";
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getDatanodeUuid()).thenReturn(storageID);
try {
//Register this node
dm.registerDatanode(dr);
Assert.fail("Expected an UnresolvedTopologyException");
} catch (UnresolvedTopologyException ute) {
LOG.info("Expected - topology is not resolved and " +
"registration is rejected.");
} catch (Exception e) {
Assert.fail("Expected an UnresolvedTopologyException");
}
}
/**
* MyResolver class provides resolve method which always returns null
* in order to simulate unresolved topology mapping.
*/
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return null;
}
@Override
public void reloadCachedMappings() {
}
@Override
public void reloadCachedMappings(List<String> names) {
}
}
} }