HDFS-4521. Invalid network toploogies should not be cached. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1457878 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-03-18 17:20:13 +00:00
parent 7eb7b3b723
commit 64741f4635
14 changed files with 341 additions and 131 deletions

View File

@ -149,4 +149,9 @@ public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
public boolean isSingleSwitch() {
return isMappingSingleSwitch(rawMapping);
}
@Override
public void reloadCachedMappings() {
cache.clear();
}
}

View File

@ -51,4 +51,12 @@ public interface DNSToSwitchMapping {
* If <i>names</i> is empty, the returned list is also empty
*/
public List<String> resolve(List<String> names);
/**
* Reload all of the cached mappings.
*
* If there is a cache, this method will clear it, so that future accesses
* will get a chance to see the new data.
*/
public void reloadCachedMappings();
}

View File

@ -392,8 +392,16 @@ public class NetworkTopology {
throw new IllegalArgumentException(
"Not allow to add an inner node: "+NodeBase.getPath(node));
}
int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
netlock.writeLock().lock();
try {
if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
LOG.error("Error: can't add leaf node at depth " +
newDepth + " to topology:\n" + oldTopoStr);
throw new InvalidTopologyException("Invalid network topology. " +
"You cannot have a rack and a non-rack node at the same " +
"level of the network topology.");
}
Node rack = getNodeForNetworkLocation(node);
if (rack != null && !(rack instanceof InnerNode)) {
throw new IllegalArgumentException("Unexpected data node "
@ -408,14 +416,6 @@ public class NetworkTopology {
if (!(node instanceof InnerNode)) {
if (depthOfAllLeaves == -1) {
depthOfAllLeaves = node.getLevel();
} else {
if (depthOfAllLeaves != node.getLevel()) {
LOG.error("Error: can't add leaf node at depth " +
node.getLevel() + " to topology:\n" + oldTopoStr);
throw new InvalidTopologyException("Invalid network topology. " +
"You cannot have a rack and a non-rack node at the same " +
"level of the network topology.");
}
}
}
}

View File

@ -167,4 +167,16 @@ public class NodeBase implements Node {
public void setLevel(int level) {
this.level = level;
}
public static int locationToDepth(String location) {
String normalizedLocation = normalize(location);
int length = normalizedLocation.length();
int depth = 0;
for (int i = 0; i < length; i++) {
if (normalizedLocation.charAt(i) == PATH_SEPARATOR) {
depth++;
}
}
return depth;
}
}

View File

@ -263,5 +263,11 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
public String toString() {
return scriptName != null ? ("script " + scriptName) : NO_SCRIPT;
}
@Override
public void reloadCachedMappings() {
// Nothing to do here, since RawScriptBasedMapping has no cache, and
// does not inherit from CachedDNSToSwitchMapping
}
}
}

View File

@ -76,20 +76,24 @@ public class TableMapping extends CachedDNSToSwitchMapping {
getRawMapping().setConf(conf);
}
@Override
public void reloadCachedMappings() {
super.reloadCachedMappings();
getRawMapping().reloadCachedMappings();
}
private static final class RawTableMapping extends Configured
implements DNSToSwitchMapping {
private final Map<String, String> map = new HashMap<String, String>();
private boolean initialized = false;
private Map<String, String> map;
private synchronized void load() {
map.clear();
private Map<String, String> load() {
Map<String, String> loadMap = new HashMap<String, String>();
String filename = getConf().get(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, null);
if (StringUtils.isBlank(filename)) {
LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. "
+ NetworkTopology.DEFAULT_RACK + " will be returned.");
return;
LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. ");
return null;
}
BufferedReader reader = null;
@ -101,7 +105,7 @@ public class TableMapping extends CachedDNSToSwitchMapping {
if (line.length() != 0 && line.charAt(0) != '#') {
String[] columns = line.split("\\s+");
if (columns.length == 2) {
map.put(columns[0], columns[1]);
loadMap.put(columns[0], columns[1]);
} else {
LOG.warn("Line does not have two columns. Ignoring. " + line);
}
@ -109,29 +113,31 @@ public class TableMapping extends CachedDNSToSwitchMapping {
line = reader.readLine();
}
} catch (Exception e) {
LOG.warn(filename + " cannot be read. " + NetworkTopology.DEFAULT_RACK
+ " will be returned.", e);
map.clear();
LOG.warn(filename + " cannot be read.", e);
return null;
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
LOG.warn(filename + " cannot be read. "
+ NetworkTopology.DEFAULT_RACK + " will be returned.", e);
map.clear();
LOG.warn(filename + " cannot be read.", e);
return null;
}
}
}
return loadMap;
}
@Override
public synchronized List<String> resolve(List<String> names) {
if (!initialized) {
initialized = true;
load();
if (map == null) {
map = load();
if (map == null) {
LOG.warn("Failed to read topology table. " +
NetworkTopology.DEFAULT_RACK + " will be used for all nodes.");
map = new HashMap<String, String>();
}
}
List<String> results = new ArrayList<String>(names.size());
for (String name : names) {
String result = map.get(name);
@ -143,6 +149,18 @@ public class TableMapping extends CachedDNSToSwitchMapping {
}
return results;
}
@Override
public void reloadCachedMappings() {
Map<String, String> newMap = load();
if (newMap == null) {
LOG.error("Failed to reload the topology table. The cached " +
"mappings will not be cleared.");
} else {
synchronized(this) {
map = newMap;
}
}
}
}
}

View File

@ -147,4 +147,9 @@ public class StaticMapping extends AbstractDNSToSwitchMapping {
nameToRackMap.clear();
}
}
public void reloadCachedMappings() {
// reloadCachedMappings does nothing for StaticMapping; there is
// nowhere to reload from since all data is in memory.
}
}

View File

@ -116,5 +116,9 @@ public class TestSwitchMapping extends Assert {
public List<String> resolve(List<String> names) {
return names;
}
@Override
public void reloadCachedMappings() {
}
}
}

View File

@ -34,23 +34,17 @@ import org.junit.Before;
import org.junit.Test;
public class TestTableMapping {
private File mappingFile;
@Before
public void setUp() throws IOException {
mappingFile = File.createTempFile(getClass().getSimpleName(), ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mappingFile, Charsets.UTF_8);
mappingFile.deleteOnExit();
}
@Test
public void testResolve() throws IOException {
File mapFile = File.createTempFile(getClass().getSimpleName() +
".testResolve", ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping();
Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath());
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf);
List<String> names = new ArrayList<String>();
@ -65,10 +59,15 @@ public class TestTableMapping {
@Test
public void testTableCaching() throws IOException {
File mapFile = File.createTempFile(getClass().getSimpleName() +
".testTableCaching", ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping();
Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath());
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf);
List<String> names = new ArrayList<String>();
@ -123,13 +122,53 @@ public class TestTableMapping {
}
@Test
public void testBadFile() throws IOException {
Files.write("bad contents", mappingFile, Charsets.UTF_8);
public void testClearingCachedMappings() throws IOException {
File mapFile = File.createTempFile(getClass().getSimpleName() +
".testClearingCachedMappings", ".txt");
Files.write("a.b.c /rack1\n" +
"1.2.3.4\t/rack2\n", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping();
Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mappingFile.getCanonicalPath());
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf);
List<String> names = new ArrayList<String>();
names.add("a.b.c");
names.add("1.2.3.4");
List<String> result = mapping.resolve(names);
assertEquals(names.size(), result.size());
assertEquals("/rack1", result.get(0));
assertEquals("/rack2", result.get(1));
Files.write("", mapFile, Charsets.UTF_8);
mapping.reloadCachedMappings();
names = new ArrayList<String>();
names.add("a.b.c");
names.add("1.2.3.4");
result = mapping.resolve(names);
assertEquals(names.size(), result.size());
assertEquals(NetworkTopology.DEFAULT_RACK, result.get(0));
assertEquals(NetworkTopology.DEFAULT_RACK, result.get(1));
}
@Test(timeout=60000)
public void testBadFile() throws IOException {
File mapFile = File.createTempFile(getClass().getSimpleName() +
".testBadFile", ".txt");
Files.write("bad contents", mapFile, Charsets.UTF_8);
mapFile.deleteOnExit();
TableMapping mapping = new TableMapping();
Configuration conf = new Configuration();
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath());
mapping.setConf(conf);
List<String> names = new ArrayList<String>();
@ -141,5 +180,4 @@ public class TestTableMapping {
assertEquals(result.get(0), NetworkTopology.DEFAULT_RACK);
assertEquals(result.get(1), NetworkTopology.DEFAULT_RACK);
}
}

View File

@ -363,6 +363,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4569. Small image transfer related cleanups.
(Andrew Wang via suresh)
HDFS-4521. Invalid network toploogies should not be cached. (Colin Patrick
McCabe via atm)
OPTIMIZATIONS
BUG FIXES

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
@ -431,8 +432,8 @@ public class DatanodeManager {
host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
}
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
networktopology.add(node);
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
@ -647,92 +648,122 @@ public class DatanodeManager {
nodeReg.setIpAddr(ip);
nodeReg.setPeerHostName(hostname);
}
nodeReg.setExportedKeys(blockManager.getBlockKeys());
// Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering.
if (!inHostsList(nodeReg)) {
throw new DisallowedDatanodeException(nodeReg);
}
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
+ nodeReg + " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
nodeReg.getIpAddr(), nodeReg.getXferPort());
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
removeDatanode(nodeN);
// physically remove node from datanodeMap
wipeDatanode(nodeN);
nodeN = null;
}
if (nodeS != null) {
if (nodeN == nodeS) {
// The same datanode has been just restarted to serve the same data
// storage. We do not need to remove old data blocks, the delta will
// be calculated on the next block report from the datanode
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
+ "node restarted.");
}
} else {
// nodeS is found
/* The registering datanode is a replacement node for the existing
data storage, which from now on will be served by a new node.
If this message repeats, both nodes might have same storageID
by (insanely rare) random chance. User needs to restart one of the
nodes with its data cleared (or user can just remove the StorageID
value in "VERSION" file under the data directory of the datanode,
but this is might not work if VERSION file format has changed
*/
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
+ " is replaced by " + nodeReg + " with the same storageID "
+ nodeReg.getStorageID());
}
// update cluster map
getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
resolveNetworkLocation(nodeS);
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
checkDecommissioning(nodeS);
return;
}
// this is a new datanode serving a new data storage
if ("".equals(nodeReg.getStorageID())) {
// this data storage has never been registered
// it is either empty or was created by pre-storageID version of DFS
nodeReg.setStorageID(newStorageID());
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
+ "new storageID " + nodeReg.getStorageID() + " assigned.");
}
}
// register new datanode
DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
resolveNetworkLocation(nodeDescr);
addDatanode(nodeDescr);
checkDecommissioning(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
try {
nodeReg.setExportedKeys(blockManager.getBlockKeys());
// Checks if the node is not on the hosts list. If it is not, then
// it will be disallowed from registering.
if (!inHostsList(nodeReg)) {
throw new DisallowedDatanodeException(nodeReg);
}
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
+ nodeReg + " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
nodeReg.getIpAddr(), nodeReg.getXferPort());
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
removeDatanode(nodeN);
// physically remove node from datanodeMap
wipeDatanode(nodeN);
nodeN = null;
}
if (nodeS != null) {
if (nodeN == nodeS) {
// The same datanode has been just restarted to serve the same data
// storage. We do not need to remove old data blocks, the delta will
// be calculated on the next block report from the datanode
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
+ "node restarted.");
}
} else {
// nodeS is found
/* The registering datanode is a replacement node for the existing
data storage, which from now on will be served by a new node.
If this message repeats, both nodes might have same storageID
by (insanely rare) random chance. User needs to restart one of the
nodes with its data cleared (or user can just remove the StorageID
value in "VERSION" file under the data directory of the datanode,
but this is might not work if VERSION file format has changed
*/
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
+ " is replaced by " + nodeReg + " with the same storageID "
+ nodeReg.getStorageID());
}
boolean success = false;
try {
// update cluster map
getNetworkTopology().remove(nodeS);
nodeS.updateRegInfo(nodeReg);
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
resolveNetworkLocation(nodeS);
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
checkDecommissioning(nodeS);
success = true;
} finally {
if (!success) {
removeDatanode(nodeS);
wipeDatanode(nodeS);
}
}
return;
}
// this is a new datanode serving a new data storage
if ("".equals(nodeReg.getStorageID())) {
// this data storage has never been registered
// it is either empty or was created by pre-storageID version of DFS
nodeReg.setStorageID(newStorageID());
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
+ "new storageID " + nodeReg.getStorageID() + " assigned.");
}
}
DatanodeDescriptor nodeDescr
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
boolean success = false;
try {
resolveNetworkLocation(nodeDescr);
networktopology.add(nodeDescr);
// register new datanode
addDatanode(nodeDescr);
checkDecommissioning(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
success = true;
} finally {
if (!success) {
removeDatanode(nodeDescr);
wipeDatanode(nodeDescr);
}
}
} catch (InvalidTopologyException e) {
// If the network location is invalid, clear the cached mappings
// so that we have a chance to re-add this DataNode with the
// correct network location later.
dnsToSwitchMapping.reloadCachedMappings();
throw e;
}
}
/**

View File

@ -26,12 +26,23 @@ import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.junit.Before;
import org.junit.Test;
public class TestNetworkTopology {
private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
private final static NetworkTopology cluster = new NetworkTopology();
private DatanodeDescriptor dataNodes[];
@ -213,4 +224,65 @@ public class TestNetworkTopology {
}
}
}
@Test(timeout=180000)
public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception {
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
// bad rack topology
String racks[] = { "/a/b", "/c" };
String hosts[] = { "foo1.example.com", "foo2.example.com" };
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).
racks(racks).hosts(hosts).build();
cluster.waitActive();
NamenodeProtocols nn = cluster.getNameNodeRpc();
Assert.assertNotNull(nn);
// Wait for one DataNode to register.
// The other DataNode will not be able to register up because of the rack mismatch.
DatanodeInfo[] info;
while (true) {
info = nn.getDatanodeReport(DatanodeReportType.LIVE);
Assert.assertFalse(info.length == 2);
if (info.length == 1) {
break;
}
Thread.sleep(1000);
}
// Set the network topology of the other node to the match the network
// topology of the node that came up.
int validIdx = info[0].getHostName().equals(hosts[0]) ? 0 : 1;
int invalidIdx = validIdx == 1 ? 0 : 1;
StaticMapping.addNodeToRack(hosts[invalidIdx], racks[validIdx]);
LOG.info("datanode " + validIdx + " came up with network location " +
info[0].getNetworkLocation());
// Restart the DN with the invalid topology and wait for it to register.
cluster.restartDataNode(invalidIdx);
Thread.sleep(5000);
while (true) {
info = nn.getDatanodeReport(DatanodeReportType.LIVE);
if (info.length == 2) {
break;
}
if (info.length == 0) {
LOG.info("got no valid DNs");
} else if (info.length == 1) {
LOG.info("got one valid DN: " + info[0].getHostName() +
" (at " + info[0].getNetworkLocation() + ")");
}
Thread.sleep(1000);
}
Assert.assertEquals(info[0].getNetworkLocation(),
info[1].getNetworkLocation());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -83,6 +83,10 @@ public class TestJobHistoryParsing {
public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{RACK_NAME});
}
@Override
public void reloadCachedMappings() {
}
}
@Test

View File

@ -63,6 +63,10 @@ public class TestRackResolver {
return returnList;
}
@Override
public void reloadCachedMappings() {
// nothing to do here, since RawScriptBasedMapping has no cache.
}
}
@Test