From 64741f46352f25743bfb77f804a06970d355a177 Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Mon, 18 Mar 2013 17:20:13 +0000 Subject: [PATCH] HDFS-4521. Invalid network toploogies should not be cached. Contributed by Colin Patrick McCabe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1457878 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/net/CachedDNSToSwitchMapping.java | 5 + .../apache/hadoop/net/DNSToSwitchMapping.java | 8 + .../apache/hadoop/net/NetworkTopology.java | 16 +- .../java/org/apache/hadoop/net/NodeBase.java | 12 ++ .../apache/hadoop/net/ScriptBasedMapping.java | 6 + .../org/apache/hadoop/net/TableMapping.java | 56 +++-- .../org/apache/hadoop/net/StaticMapping.java | 5 + .../apache/hadoop/net/TestSwitchMapping.java | 4 + .../apache/hadoop/net/TestTableMapping.java | 74 +++++-- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../blockmanagement/DatanodeManager.java | 203 ++++++++++-------- .../hadoop/net/TestNetworkTopology.java | 72 +++++++ .../v2/hs/TestJobHistoryParsing.java | 4 + .../hadoop/yarn/util/TestRackResolver.java | 4 + 14 files changed, 341 insertions(+), 131 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java index a447c6a084f..e152c549ffc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java @@ -149,4 +149,9 @@ public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping { public boolean isSingleSwitch() { return isMappingSingleSwitch(rawMapping); } + + @Override + public void reloadCachedMappings() { + cache.clear(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java index 8521a9a92be..ccc109302fd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNSToSwitchMapping.java @@ -51,4 +51,12 @@ public interface DNSToSwitchMapping { * If names is empty, the returned list is also empty */ public List resolve(List names); + + /** + * Reload all of the cached mappings. + * + * If there is a cache, this method will clear it, so that future accesses + * will get a chance to see the new data. + */ + public void reloadCachedMappings(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 3f3e8ef1bf3..cf2ab286814 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -392,8 +392,16 @@ public class NetworkTopology { throw new IllegalArgumentException( "Not allow to add an inner node: "+NodeBase.getPath(node)); } + int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1; netlock.writeLock().lock(); try { + if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) { + LOG.error("Error: can't add leaf node at depth " + + newDepth + " to topology:\n" + oldTopoStr); + throw new InvalidTopologyException("Invalid network topology. " + + "You cannot have a rack and a non-rack node at the same " + + "level of the network topology."); + } Node rack = getNodeForNetworkLocation(node); if (rack != null && !(rack instanceof InnerNode)) { throw new IllegalArgumentException("Unexpected data node " @@ -408,14 +416,6 @@ public class NetworkTopology { if (!(node instanceof InnerNode)) { if (depthOfAllLeaves == -1) { depthOfAllLeaves = node.getLevel(); - } else { - if (depthOfAllLeaves != node.getLevel()) { - LOG.error("Error: can't add leaf node at depth " + - node.getLevel() + " to topology:\n" + oldTopoStr); - throw new InvalidTopologyException("Invalid network topology. " + - "You cannot have a rack and a non-rack node at the same " + - "level of the network topology."); - } } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java index a61054d3723..9f40eeaa9b2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java @@ -167,4 +167,16 @@ public class NodeBase implements Node { public void setLevel(int level) { this.level = level; } + + public static int locationToDepth(String location) { + String normalizedLocation = normalize(location); + int length = normalizedLocation.length(); + int depth = 0; + for (int i = 0; i < length; i++) { + if (normalizedLocation.charAt(i) == PATH_SEPARATOR) { + depth++; + } + } + return depth; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java index b8502d016b0..c9e62e8580c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java @@ -263,5 +263,11 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { public String toString() { return scriptName != null ? ("script " + scriptName) : NO_SCRIPT; } + + @Override + public void reloadCachedMappings() { + // Nothing to do here, since RawScriptBasedMapping has no cache, and + // does not inherit from CachedDNSToSwitchMapping + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/TableMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/TableMapping.java index b245c80969a..fa96476ff6e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/TableMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/TableMapping.java @@ -76,20 +76,24 @@ public class TableMapping extends CachedDNSToSwitchMapping { getRawMapping().setConf(conf); } + @Override + public void reloadCachedMappings() { + super.reloadCachedMappings(); + getRawMapping().reloadCachedMappings(); + } + private static final class RawTableMapping extends Configured implements DNSToSwitchMapping { - private final Map map = new HashMap(); - private boolean initialized = false; + private Map map; - private synchronized void load() { - map.clear(); + private Map load() { + Map loadMap = new HashMap(); String filename = getConf().get(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, null); if (StringUtils.isBlank(filename)) { - LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. " - + NetworkTopology.DEFAULT_RACK + " will be returned."); - return; + LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. "); + return null; } BufferedReader reader = null; @@ -101,7 +105,7 @@ public class TableMapping extends CachedDNSToSwitchMapping { if (line.length() != 0 && line.charAt(0) != '#') { String[] columns = line.split("\\s+"); if (columns.length == 2) { - map.put(columns[0], columns[1]); + loadMap.put(columns[0], columns[1]); } else { LOG.warn("Line does not have two columns. Ignoring. " + line); } @@ -109,29 +113,31 @@ public class TableMapping extends CachedDNSToSwitchMapping { line = reader.readLine(); } } catch (Exception e) { - LOG.warn(filename + " cannot be read. " + NetworkTopology.DEFAULT_RACK - + " will be returned.", e); - map.clear(); + LOG.warn(filename + " cannot be read.", e); + return null; } finally { if (reader != null) { try { reader.close(); } catch (IOException e) { - LOG.warn(filename + " cannot be read. " - + NetworkTopology.DEFAULT_RACK + " will be returned.", e); - map.clear(); + LOG.warn(filename + " cannot be read.", e); + return null; } } } + return loadMap; } @Override public synchronized List resolve(List names) { - if (!initialized) { - initialized = true; - load(); + if (map == null) { + map = load(); + if (map == null) { + LOG.warn("Failed to read topology table. " + + NetworkTopology.DEFAULT_RACK + " will be used for all nodes."); + map = new HashMap(); + } } - List results = new ArrayList(names.size()); for (String name : names) { String result = map.get(name); @@ -143,6 +149,18 @@ public class TableMapping extends CachedDNSToSwitchMapping { } return results; } - + + @Override + public void reloadCachedMappings() { + Map newMap = load(); + if (newMap == null) { + LOG.error("Failed to reload the topology table. The cached " + + "mappings will not be cleared."); + } else { + synchronized(this) { + map = newMap; + } + } + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java index 4204e2b6245..5492b47c8b7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java @@ -147,4 +147,9 @@ public class StaticMapping extends AbstractDNSToSwitchMapping { nameToRackMap.clear(); } } + + public void reloadCachedMappings() { + // reloadCachedMappings does nothing for StaticMapping; there is + // nowhere to reload from since all data is in memory. + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java index c2fcf172c0c..1caa454243f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java @@ -116,5 +116,9 @@ public class TestSwitchMapping extends Assert { public List resolve(List names) { return names; } + + @Override + public void reloadCachedMappings() { + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestTableMapping.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestTableMapping.java index 6356555da42..5281694179c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestTableMapping.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestTableMapping.java @@ -34,23 +34,17 @@ import org.junit.Before; import org.junit.Test; public class TestTableMapping { - - private File mappingFile; - - @Before - public void setUp() throws IOException { - mappingFile = File.createTempFile(getClass().getSimpleName(), ".txt"); - Files.write("a.b.c /rack1\n" + - "1.2.3.4\t/rack2\n", mappingFile, Charsets.UTF_8); - mappingFile.deleteOnExit(); - } - @Test public void testResolve() throws IOException { + File mapFile = File.createTempFile(getClass().getSimpleName() + + ".testResolve", ".txt"); + Files.write("a.b.c /rack1\n" + + "1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8); + mapFile.deleteOnExit(); TableMapping mapping = new TableMapping(); Configuration conf = new Configuration(); - conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath()); + conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath()); mapping.setConf(conf); List names = new ArrayList(); @@ -65,10 +59,15 @@ public class TestTableMapping { @Test public void testTableCaching() throws IOException { + File mapFile = File.createTempFile(getClass().getSimpleName() + + ".testTableCaching", ".txt"); + Files.write("a.b.c /rack1\n" + + "1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8); + mapFile.deleteOnExit(); TableMapping mapping = new TableMapping(); Configuration conf = new Configuration(); - conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath()); + conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath()); mapping.setConf(conf); List names = new ArrayList(); @@ -123,13 +122,53 @@ public class TestTableMapping { } @Test - public void testBadFile() throws IOException { - Files.write("bad contents", mappingFile, Charsets.UTF_8); - + public void testClearingCachedMappings() throws IOException { + File mapFile = File.createTempFile(getClass().getSimpleName() + + ".testClearingCachedMappings", ".txt"); + Files.write("a.b.c /rack1\n" + + "1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8); + mapFile.deleteOnExit(); + TableMapping mapping = new TableMapping(); Configuration conf = new Configuration(); - conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath()); + conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath()); + mapping.setConf(conf); + + List names = new ArrayList(); + names.add("a.b.c"); + names.add("1.2.3.4"); + + List result = mapping.resolve(names); + assertEquals(names.size(), result.size()); + assertEquals("/rack1", result.get(0)); + assertEquals("/rack2", result.get(1)); + + Files.write("", mapFile, Charsets.UTF_8); + + mapping.reloadCachedMappings(); + + names = new ArrayList(); + names.add("a.b.c"); + names.add("1.2.3.4"); + + result = mapping.resolve(names); + assertEquals(names.size(), result.size()); + assertEquals(NetworkTopology.DEFAULT_RACK, result.get(0)); + assertEquals(NetworkTopology.DEFAULT_RACK, result.get(1)); + } + + + @Test(timeout=60000) + public void testBadFile() throws IOException { + File mapFile = File.createTempFile(getClass().getSimpleName() + + ".testBadFile", ".txt"); + Files.write("bad contents", mapFile, Charsets.UTF_8); + mapFile.deleteOnExit(); + TableMapping mapping = new TableMapping(); + + Configuration conf = new Configuration(); + conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath()); mapping.setConf(conf); List names = new ArrayList(); @@ -141,5 +180,4 @@ public class TestTableMapping { assertEquals(result.get(0), NetworkTopology.DEFAULT_RACK); assertEquals(result.get(1), NetworkTopology.DEFAULT_RACK); } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fea9b8e8973..8ae1682ac35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -363,6 +363,9 @@ Release 2.0.5-beta - UNRELEASED HDFS-4569. Small image transfer related cleanups. (Andrew Wang via suresh) + HDFS-4521. Invalid network toploogies should not be cached. (Colin Patrick + McCabe via atm) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index e4b9ffb8dd8..525bf602cd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; @@ -431,8 +432,8 @@ public class DatanodeManager { host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node)); } + networktopology.add(node); // may throw InvalidTopologyException host2DatanodeMap.add(node); - networktopology.add(node); checkIfClusterIsNowMultiRack(node); if (LOG.isDebugEnabled()) { @@ -647,92 +648,122 @@ public class DatanodeManager { nodeReg.setIpAddr(ip); nodeReg.setPeerHostName(hostname); } - - nodeReg.setExportedKeys(blockManager.getBlockKeys()); - - // Checks if the node is not on the hosts list. If it is not, then - // it will be disallowed from registering. - if (!inHostsList(nodeReg)) { - throw new DisallowedDatanodeException(nodeReg); - } - - NameNode.stateChangeLog.info("BLOCK* registerDatanode: from " - + nodeReg + " storage " + nodeReg.getStorageID()); - - DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID()); - DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr( - nodeReg.getIpAddr(), nodeReg.getXferPort()); - - if (nodeN != null && nodeN != nodeS) { - NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN); - // nodeN previously served a different data storage, - // which is not served by anybody anymore. - removeDatanode(nodeN); - // physically remove node from datanodeMap - wipeDatanode(nodeN); - nodeN = null; - } - - if (nodeS != null) { - if (nodeN == nodeS) { - // The same datanode has been just restarted to serve the same data - // storage. We do not need to remove old data blocks, the delta will - // be calculated on the next block report from the datanode - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("BLOCK* registerDatanode: " - + "node restarted."); - } - } else { - // nodeS is found - /* The registering datanode is a replacement node for the existing - data storage, which from now on will be served by a new node. - If this message repeats, both nodes might have same storageID - by (insanely rare) random chance. User needs to restart one of the - nodes with its data cleared (or user can just remove the StorageID - value in "VERSION" file under the data directory of the datanode, - but this is might not work if VERSION file format has changed - */ - NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS - + " is replaced by " + nodeReg + " with the same storageID " - + nodeReg.getStorageID()); - } - // update cluster map - getNetworkTopology().remove(nodeS); - nodeS.updateRegInfo(nodeReg); - nodeS.setDisallowed(false); // Node is in the include list - - // resolve network location - resolveNetworkLocation(nodeS); - getNetworkTopology().add(nodeS); - - // also treat the registration message as a heartbeat - heartbeatManager.register(nodeS); - checkDecommissioning(nodeS); - return; - } - - // this is a new datanode serving a new data storage - if ("".equals(nodeReg.getStorageID())) { - // this data storage has never been registered - // it is either empty or was created by pre-storageID version of DFS - nodeReg.setStorageID(newStorageID()); - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug( - "BLOCK* NameSystem.registerDatanode: " - + "new storageID " + nodeReg.getStorageID() + " assigned."); - } - } - // register new datanode - DatanodeDescriptor nodeDescr - = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); - resolveNetworkLocation(nodeDescr); - addDatanode(nodeDescr); - checkDecommissioning(nodeDescr); - // also treat the registration message as a heartbeat - // no need to update its timestamp - // because its is done when the descriptor is created - heartbeatManager.addDatanode(nodeDescr); + try { + nodeReg.setExportedKeys(blockManager.getBlockKeys()); + + // Checks if the node is not on the hosts list. If it is not, then + // it will be disallowed from registering. + if (!inHostsList(nodeReg)) { + throw new DisallowedDatanodeException(nodeReg); + } + + NameNode.stateChangeLog.info("BLOCK* registerDatanode: from " + + nodeReg + " storage " + nodeReg.getStorageID()); + + DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID()); + DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr( + nodeReg.getIpAddr(), nodeReg.getXferPort()); + + if (nodeN != null && nodeN != nodeS) { + NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN); + // nodeN previously served a different data storage, + // which is not served by anybody anymore. + removeDatanode(nodeN); + // physically remove node from datanodeMap + wipeDatanode(nodeN); + nodeN = null; + } + + if (nodeS != null) { + if (nodeN == nodeS) { + // The same datanode has been just restarted to serve the same data + // storage. We do not need to remove old data blocks, the delta will + // be calculated on the next block report from the datanode + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("BLOCK* registerDatanode: " + + "node restarted."); + } + } else { + // nodeS is found + /* The registering datanode is a replacement node for the existing + data storage, which from now on will be served by a new node. + If this message repeats, both nodes might have same storageID + by (insanely rare) random chance. User needs to restart one of the + nodes with its data cleared (or user can just remove the StorageID + value in "VERSION" file under the data directory of the datanode, + but this is might not work if VERSION file format has changed + */ + NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS + + " is replaced by " + nodeReg + " with the same storageID " + + nodeReg.getStorageID()); + } + + boolean success = false; + try { + // update cluster map + getNetworkTopology().remove(nodeS); + nodeS.updateRegInfo(nodeReg); + nodeS.setDisallowed(false); // Node is in the include list + + // resolve network location + resolveNetworkLocation(nodeS); + getNetworkTopology().add(nodeS); + + // also treat the registration message as a heartbeat + heartbeatManager.register(nodeS); + checkDecommissioning(nodeS); + success = true; + } finally { + if (!success) { + removeDatanode(nodeS); + wipeDatanode(nodeS); + } + } + return; + } + + // this is a new datanode serving a new data storage + if ("".equals(nodeReg.getStorageID())) { + // this data storage has never been registered + // it is either empty or was created by pre-storageID version of DFS + nodeReg.setStorageID(newStorageID()); + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.registerDatanode: " + + "new storageID " + nodeReg.getStorageID() + " assigned."); + } + } + + DatanodeDescriptor nodeDescr + = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); + boolean success = false; + try { + resolveNetworkLocation(nodeDescr); + networktopology.add(nodeDescr); + + // register new datanode + addDatanode(nodeDescr); + checkDecommissioning(nodeDescr); + + // also treat the registration message as a heartbeat + // no need to update its timestamp + // because its is done when the descriptor is created + heartbeatManager.addDatanode(nodeDescr); + success = true; + } finally { + if (!success) { + removeDatanode(nodeDescr); + wipeDatanode(nodeDescr); + } + } + } catch (InvalidTopologyException e) { + // If the network location is invalid, clear the cached mappings + // so that we have a chance to re-add this DataNode with the + // correct network location later. + dnsToSwitchMapping.reloadCachedMappings(); + throw e; + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index 2c4721b6204..83d5a88cb23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -26,12 +26,23 @@ import static org.junit.Assert.fail; import java.util.HashMap; import java.util.Map; +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.junit.Before; import org.junit.Test; public class TestNetworkTopology { + private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class); private final static NetworkTopology cluster = new NetworkTopology(); private DatanodeDescriptor dataNodes[]; @@ -213,4 +224,65 @@ public class TestNetworkTopology { } } } + + @Test(timeout=180000) + public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception { + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + // bad rack topology + String racks[] = { "/a/b", "/c" }; + String hosts[] = { "foo1.example.com", "foo2.example.com" }; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2). + racks(racks).hosts(hosts).build(); + cluster.waitActive(); + + NamenodeProtocols nn = cluster.getNameNodeRpc(); + Assert.assertNotNull(nn); + + // Wait for one DataNode to register. + // The other DataNode will not be able to register up because of the rack mismatch. + DatanodeInfo[] info; + while (true) { + info = nn.getDatanodeReport(DatanodeReportType.LIVE); + Assert.assertFalse(info.length == 2); + if (info.length == 1) { + break; + } + Thread.sleep(1000); + } + // Set the network topology of the other node to the match the network + // topology of the node that came up. + int validIdx = info[0].getHostName().equals(hosts[0]) ? 0 : 1; + int invalidIdx = validIdx == 1 ? 0 : 1; + StaticMapping.addNodeToRack(hosts[invalidIdx], racks[validIdx]); + LOG.info("datanode " + validIdx + " came up with network location " + + info[0].getNetworkLocation()); + + // Restart the DN with the invalid topology and wait for it to register. + cluster.restartDataNode(invalidIdx); + Thread.sleep(5000); + while (true) { + info = nn.getDatanodeReport(DatanodeReportType.LIVE); + if (info.length == 2) { + break; + } + if (info.length == 0) { + LOG.info("got no valid DNs"); + } else if (info.length == 1) { + LOG.info("got one valid DN: " + info[0].getHostName() + + " (at " + info[0].getNetworkLocation() + ")"); + } + Thread.sleep(1000); + } + Assert.assertEquals(info[0].getNetworkLocation(), + info[1].getNetworkLocation()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 2eb6aaa133a..ae13ba0d355 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -83,6 +83,10 @@ public class TestJobHistoryParsing { public List resolve(List names) { return Arrays.asList(new String[]{RACK_NAME}); } + + @Override + public void reloadCachedMappings() { + } } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java index fd2967007f4..d3ccfcaa92a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java @@ -63,6 +63,10 @@ public class TestRackResolver { return returnList; } + @Override + public void reloadCachedMappings() { + // nothing to do here, since RawScriptBasedMapping has no cache. + } } @Test