HDFS-4521. Invalid network toploogies should not be cached. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1457883 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-03-18 17:26:45 +00:00
parent 8e2098fd2f
commit eb6a563194
14 changed files with 341 additions and 131 deletions

View File

@ -149,4 +149,9 @@ public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
public boolean isSingleSwitch() { public boolean isSingleSwitch() {
return isMappingSingleSwitch(rawMapping); return isMappingSingleSwitch(rawMapping);
} }
@Override
public void reloadCachedMappings() {
cache.clear();
}
} }

View File

@ -51,4 +51,12 @@ public interface DNSToSwitchMapping {
* If <i>names</i> is empty, the returned list is also empty * If <i>names</i> is empty, the returned list is also empty
*/ */
public List<String> resolve(List<String> names); public List<String> resolve(List<String> names);
/**
* Reload all of the cached mappings.
*
* If there is a cache, this method will clear it, so that future accesses
* will get a chance to see the new data.
*/
public void reloadCachedMappings();
} }

View File

@ -342,8 +342,16 @@ public class NetworkTopology {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Not allow to add an inner node: "+NodeBase.getPath(node)); "Not allow to add an inner node: "+NodeBase.getPath(node));
} }
int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
netlock.writeLock().lock(); netlock.writeLock().lock();
try { try {
if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
LOG.error("Error: can't add leaf node at depth " +
newDepth + " to topology:\n" + oldTopoStr);
throw new InvalidTopologyException("Invalid network topology. " +
"You cannot have a rack and a non-rack node at the same " +
"level of the network topology.");
}
Node rack = getNode(node.getNetworkLocation()); Node rack = getNode(node.getNetworkLocation());
if (rack != null && !(rack instanceof InnerNode)) { if (rack != null && !(rack instanceof InnerNode)) {
throw new IllegalArgumentException("Unexpected data node " throw new IllegalArgumentException("Unexpected data node "
@ -358,14 +366,6 @@ public class NetworkTopology {
if (!(node instanceof InnerNode)) { if (!(node instanceof InnerNode)) {
if (depthOfAllLeaves == -1) { if (depthOfAllLeaves == -1) {
depthOfAllLeaves = node.getLevel(); depthOfAllLeaves = node.getLevel();
} else {
if (depthOfAllLeaves != node.getLevel()) {
LOG.error("Error: can't add leaf node at depth " +
node.getLevel() + " to topology:\n" + oldTopoStr);
throw new InvalidTopologyException("Invalid network topology. " +
"You cannot have a rack and a non-rack node at the same " +
"level of the network topology.");
}
} }
} }
} }

View File

@ -167,4 +167,16 @@ public class NodeBase implements Node {
public void setLevel(int level) { public void setLevel(int level) {
this.level = level; this.level = level;
} }
public static int locationToDepth(String location) {
String normalizedLocation = normalize(location);
int length = normalizedLocation.length();
int depth = 0;
for (int i = 0; i < length; i++) {
if (normalizedLocation.charAt(i) == PATH_SEPARATOR) {
depth++;
}
}
return depth;
}
} }

View File

@ -263,5 +263,11 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
public String toString() { public String toString() {
return scriptName != null ? ("script " + scriptName) : NO_SCRIPT; return scriptName != null ? ("script " + scriptName) : NO_SCRIPT;
} }
@Override
public void reloadCachedMappings() {
// Nothing to do here, since RawScriptBasedMapping has no cache, and
// does not inherit from CachedDNSToSwitchMapping
}
} }
} }

View File

@ -76,20 +76,24 @@ public class TableMapping extends CachedDNSToSwitchMapping {
getRawMapping().setConf(conf); getRawMapping().setConf(conf);
} }
@Override
public void reloadCachedMappings() {
super.reloadCachedMappings();
getRawMapping().reloadCachedMappings();
}
private static final class RawTableMapping extends Configured private static final class RawTableMapping extends Configured
implements DNSToSwitchMapping { implements DNSToSwitchMapping {
private final Map<String, String> map = new HashMap<String, String>(); private Map<String, String> map;
private boolean initialized = false;
private synchronized void load() { private Map<String, String> load() {
map.clear(); Map<String, String> loadMap = new HashMap<String, String>();
String filename = getConf().get(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, null); String filename = getConf().get(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, null);
if (StringUtils.isBlank(filename)) { if (StringUtils.isBlank(filename)) {
LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. " LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. ");
+ NetworkTopology.DEFAULT_RACK + " will be returned."); return null;
return;
} }
BufferedReader reader = null; BufferedReader reader = null;
@ -101,7 +105,7 @@ public class TableMapping extends CachedDNSToSwitchMapping {
if (line.length() != 0 && line.charAt(0) != '#') { if (line.length() != 0 && line.charAt(0) != '#') {
String[] columns = line.split("\\s+"); String[] columns = line.split("\\s+");
if (columns.length == 2) { if (columns.length == 2) {
map.put(columns[0], columns[1]); loadMap.put(columns[0], columns[1]);
} else { } else {
LOG.warn("Line does not have two columns. Ignoring. " + line); LOG.warn("Line does not have two columns. Ignoring. " + line);
} }
@ -109,29 +113,31 @@ public class TableMapping extends CachedDNSToSwitchMapping {
line = reader.readLine(); line = reader.readLine();
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn(filename + " cannot be read. " + NetworkTopology.DEFAULT_RACK LOG.warn(filename + " cannot be read.", e);
+ " will be returned.", e); return null;
map.clear();
} finally { } finally {
if (reader != null) { if (reader != null) {
try { try {
reader.close(); reader.close();
} catch (IOException e) { } catch (IOException e) {
LOG.warn(filename + " cannot be read. " LOG.warn(filename + " cannot be read.", e);
+ NetworkTopology.DEFAULT_RACK + " will be returned.", e); return null;
map.clear();
} }
} }
} }
return loadMap;
} }
@Override @Override
public synchronized List<String> resolve(List<String> names) { public synchronized List<String> resolve(List<String> names) {
if (!initialized) { if (map == null) {
initialized = true; map = load();
load(); if (map == null) {
LOG.warn("Failed to read topology table. " +
NetworkTopology.DEFAULT_RACK + " will be used for all nodes.");
map = new HashMap<String, String>();
}
} }
List<String> results = new ArrayList<String>(names.size()); List<String> results = new ArrayList<String>(names.size());
for (String name : names) { for (String name : names) {
String result = map.get(name); String result = map.get(name);
@ -144,5 +150,17 @@ public class TableMapping extends CachedDNSToSwitchMapping {
return results; return results;
} }
@Override
public void reloadCachedMappings() {
Map<String, String> newMap = load();
if (newMap == null) {
LOG.error("Failed to reload the topology table. The cached " +
"mappings will not be cleared.");
} else {
synchronized(this) {
map = newMap;
}
}
}
} }
} }

View File

@ -147,4 +147,9 @@ public class StaticMapping extends AbstractDNSToSwitchMapping {
nameToRackMap.clear(); nameToRackMap.clear();
} }
} }
public void reloadCachedMappings() {
// reloadCachedMappings does nothing for StaticMapping; there is
// nowhere to reload from since all data is in memory.
}
} }

View File

@ -116,5 +116,9 @@ public class TestSwitchMapping extends Assert {
public List<String> resolve(List<String> names) { public List<String> resolve(List<String> names) {
return names; return names;
} }
@Override
public void reloadCachedMappings() {
}
} }
} }

View File

@ -34,23 +34,17 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestTableMapping { public class TestTableMapping {
private File mappingFile;
@Before
public void setUp() throws IOException {
mappingFile = File.createTempFile(getClass().getSimpleName(), ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mappingFile, Charsets.UTF_8);
mappingFile.deleteOnExit();
}
@Test @Test
public void testResolve() throws IOException { public void testResolve() throws IOException {
File mapFile = File.createTempFile(getClass().getSimpleName() +
".testResolve", ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping(); TableMapping mapping = new TableMapping();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath()); conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf); mapping.setConf(conf);
List<String> names = new ArrayList<String>(); List<String> names = new ArrayList<String>();
@ -65,10 +59,15 @@ public class TestTableMapping {
@Test @Test
public void testTableCaching() throws IOException { public void testTableCaching() throws IOException {
File mapFile = File.createTempFile(getClass().getSimpleName() +
".testTableCaching", ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping(); TableMapping mapping = new TableMapping();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath()); conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf); mapping.setConf(conf);
List<String> names = new ArrayList<String>(); List<String> names = new ArrayList<String>();
@ -123,13 +122,53 @@ public class TestTableMapping {
} }
@Test @Test
public void testBadFile() throws IOException { public void testClearingCachedMappings() throws IOException {
Files.write("bad contents", mappingFile, Charsets.UTF_8); File mapFile = File.createTempFile(getClass().getSimpleName() +
".testClearingCachedMappings", ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping(); TableMapping mapping = new TableMapping();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath()); conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf);
List<String> names = new ArrayList<String>();
names.add("a.b.c");
names.add("1.2.3.4");
List<String> result = mapping.resolve(names);
assertEquals(names.size(), result.size());
assertEquals("/rack1", result.get(0));
assertEquals("/rack2", result.get(1));
Files.write("", mapFile, Charsets.UTF_8);
mapping.reloadCachedMappings();
names = new ArrayList<String>();
names.add("a.b.c");
names.add("1.2.3.4");
result = mapping.resolve(names);
assertEquals(names.size(), result.size());
assertEquals(NetworkTopology.DEFAULT_RACK, result.get(0));
assertEquals(NetworkTopology.DEFAULT_RACK, result.get(1));
}
@Test(timeout=60000)
public void testBadFile() throws IOException {
File mapFile = File.createTempFile(getClass().getSimpleName() +
".testBadFile", ".txt");
Files.write("bad contents", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping();
Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf); mapping.setConf(conf);
List<String> names = new ArrayList<String>(); List<String> names = new ArrayList<String>();
@ -141,5 +180,4 @@ public class TestTableMapping {
assertEquals(result.get(0), NetworkTopology.DEFAULT_RACK); assertEquals(result.get(0), NetworkTopology.DEFAULT_RACK);
assertEquals(result.get(1), NetworkTopology.DEFAULT_RACK); assertEquals(result.get(1), NetworkTopology.DEFAULT_RACK);
} }
} }

View File

@ -23,6 +23,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4569. Small image transfer related cleanups. HDFS-4569. Small image transfer related cleanups.
(Andrew Wang via suresh) (Andrew Wang via suresh)
HDFS-4521. Invalid network toploogies should not be cached. (Colin Patrick
McCabe via atm)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
@ -418,8 +419,8 @@ public class DatanodeManager {
host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node)); host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
} }
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node); host2DatanodeMap.add(node);
networktopology.add(node);
checkIfClusterIsNowMultiRack(node); checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -635,6 +636,7 @@ public class DatanodeManager {
nodeReg.setPeerHostName(hostname); nodeReg.setPeerHostName(hostname);
} }
try {
nodeReg.setExportedKeys(blockManager.getBlockKeys()); nodeReg.setExportedKeys(blockManager.getBlockKeys());
// Checks if the node is not on the hosts list. If it is not, then // Checks if the node is not on the hosts list. If it is not, then
@ -683,6 +685,9 @@ public class DatanodeManager {
+ " is replaced by " + nodeReg + " with the same storageID " + " is replaced by " + nodeReg + " with the same storageID "
+ nodeReg.getStorageID()); + nodeReg.getStorageID());
} }
boolean success = false;
try {
// update cluster map // update cluster map
getNetworkTopology().remove(nodeS); getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg); nodeS.updateRegInfo(nodeReg);
@ -695,6 +700,13 @@ public class DatanodeManager {
// also treat the registration message as a heartbeat // also treat the registration message as a heartbeat
heartbeatManager.register(nodeS); heartbeatManager.register(nodeS);
checkDecommissioning(nodeS); checkDecommissioning(nodeS);
success = true;
} finally {
if (!success) {
removeDatanode(nodeS);
wipeDatanode(nodeS);
}
}
return; return;
} }
@ -709,10 +721,15 @@ public class DatanodeManager {
+ "new storageID " + nodeReg.getStorageID() + " assigned."); + "new storageID " + nodeReg.getStorageID() + " assigned.");
} }
} }
// register new datanode
DatanodeDescriptor nodeDescr DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
boolean success = false;
try {
resolveNetworkLocation(nodeDescr); resolveNetworkLocation(nodeDescr);
networktopology.add(nodeDescr);
// register new datanode
addDatanode(nodeDescr); addDatanode(nodeDescr);
checkDecommissioning(nodeDescr); checkDecommissioning(nodeDescr);
@ -720,6 +737,20 @@ public class DatanodeManager {
// no need to update its timestamp // no need to update its timestamp
// because its is done when the descriptor is created // because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr); heartbeatManager.addDatanode(nodeDescr);
success = true;
} finally {
if (!success) {
removeDatanode(nodeDescr);
wipeDatanode(nodeDescr);
}
}
} catch (InvalidTopologyException e) {
// If the network location is invalid, clear the cached mappings
// so that we have a chance to re-add this DataNode with the
// correct network location later.
dnsToSwitchMapping.reloadCachedMappings();
throw e;
}
} }
/** /**

View File

@ -26,12 +26,23 @@ import static org.junit.Assert.fail;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestNetworkTopology { public class TestNetworkTopology {
private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
private final static NetworkTopology cluster = new NetworkTopology(); private final static NetworkTopology cluster = new NetworkTopology();
private DatanodeDescriptor dataNodes[]; private DatanodeDescriptor dataNodes[];
@ -213,4 +224,65 @@ public class TestNetworkTopology {
} }
} }
} }
@Test(timeout=180000)
public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception {
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
// bad rack topology
String racks[] = { "/a/b", "/c" };
String hosts[] = { "foo1.example.com", "foo2.example.com" };
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).
racks(racks).hosts(hosts).build();
cluster.waitActive();
NamenodeProtocols nn = cluster.getNameNodeRpc();
Assert.assertNotNull(nn);
// Wait for one DataNode to register.
// The other DataNode will not be able to register up because of the rack mismatch.
DatanodeInfo[] info;
while (true) {
info = nn.getDatanodeReport(DatanodeReportType.LIVE);
Assert.assertFalse(info.length == 2);
if (info.length == 1) {
break;
}
Thread.sleep(1000);
}
// Set the network topology of the other node to the match the network
// topology of the node that came up.
int validIdx = info[0].getHostName().equals(hosts[0]) ? 0 : 1;
int invalidIdx = validIdx == 1 ? 0 : 1;
StaticMapping.addNodeToRack(hosts[invalidIdx], racks[validIdx]);
LOG.info("datanode " + validIdx + " came up with network location " +
info[0].getNetworkLocation());
// Restart the DN with the invalid topology and wait for it to register.
cluster.restartDataNode(invalidIdx);
Thread.sleep(5000);
while (true) {
info = nn.getDatanodeReport(DatanodeReportType.LIVE);
if (info.length == 2) {
break;
}
if (info.length == 0) {
LOG.info("got no valid DNs");
} else if (info.length == 1) {
LOG.info("got one valid DN: " + info[0].getHostName() +
" (at " + info[0].getNetworkLocation() + ")");
}
Thread.sleep(1000);
}
Assert.assertEquals(info[0].getNetworkLocation(),
info[1].getNetworkLocation());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
} }

View File

@ -83,6 +83,10 @@ public class TestJobHistoryParsing {
public List<String> resolve(List<String> names) { public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{RACK_NAME}); return Arrays.asList(new String[]{RACK_NAME});
} }
@Override
public void reloadCachedMappings() {
}
} }
@Test @Test

View File

@ -63,6 +63,10 @@ public class TestRackResolver {
return returnList; return returnList;
} }
@Override
public void reloadCachedMappings() {
// nothing to do here, since RawScriptBasedMapping has no cache.
}
} }
@Test @Test