diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 08f240c665b..013d53137f2 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -12,6 +12,9 @@ Trunk (unreleased changes) HADOOP-8469. Make NetworkTopology class pluggable. (Junping Du via szetszwo) + HADOOP-8468. Add NetworkTopologyWithNodeGroup, a 4-layer implementation + of NetworkTopology. (Junping Du via szetszwo) + IMPROVEMENTS HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java new file mode 100644 index 00000000000..6066cd2a618 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * The class extends NetworkTopology to represents a cluster of computer with + * a 4-layers hierarchical network topology. + * In this network topology, leaves represent data nodes (computers) and inner + * nodes represent switches/routers that manage traffic in/out of data centers, + * racks or physical host (with virtual switch). + * + * @see NetworkTopology + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) +@InterfaceStability.Unstable +public class NetworkTopologyWithNodeGroup extends NetworkTopology { + + public final static String DEFAULT_NODEGROUP = "/default-nodegroup"; + + public NetworkTopologyWithNodeGroup() { + clusterMap = new InnerNodeWithNodeGroup(InnerNode.ROOT); + } + + @Override + protected Node getNodeForNetworkLocation(Node node) { + // if node only with default rack info, here we need to add default + // nodegroup info + if (NetworkTopology.DEFAULT_RACK.equals(node.getNetworkLocation())) { + node.setNetworkLocation(node.getNetworkLocation() + + DEFAULT_NODEGROUP); + } + Node nodeGroup = getNode(node.getNetworkLocation()); + if (nodeGroup == null) { + nodeGroup = new InnerNode(node.getNetworkLocation()); + } + return getNode(nodeGroup.getNetworkLocation()); + } + + @Override + public String getRack(String loc) { + netlock.readLock().lock(); + try { + loc = InnerNode.normalize(loc); + Node locNode = getNode(loc); + if (locNode instanceof InnerNodeWithNodeGroup) { + InnerNodeWithNodeGroup node = (InnerNodeWithNodeGroup) locNode; + if (node.isRack()) { + return loc; + } else if (node.isNodeGroup()) { + return node.getNetworkLocation(); + } else { + // may be a data center + return null; + } + } else { + // not in cluster map, don't handle it + return loc; + } + } finally { + netlock.readLock().unlock(); + } + } + + /** + * Given a string representation of a node group for a specific network + * location + * + * @param loc + * a path-like string representation of a network location + * @return a node group string + */ + public String getNodeGroup(String loc) { + netlock.readLock().lock(); + try { + loc = InnerNode.normalize(loc); + Node locNode = getNode(loc); + if (locNode instanceof InnerNodeWithNodeGroup) { + InnerNodeWithNodeGroup node = (InnerNodeWithNodeGroup) locNode; + if (node.isNodeGroup()) { + return loc; + } else if (node.isRack()) { + // not sure the node group for a rack + return null; + } else { + // may be a leaf node + return getNodeGroup(node.getNetworkLocation()); + } + } else { + // not in cluster map, don't handle it + return loc; + } + } finally { + netlock.readLock().unlock(); + } + } + + @Override + public boolean isOnSameRack( Node node1, Node node2) { + if (node1 == null || node2 == null || + node1.getParent() == null || node2.getParent() == null) { + return false; + } + + netlock.readLock().lock(); + try { + return isSameParents(node1.getParent(), node2.getParent()); + } finally { + netlock.readLock().unlock(); + } + } + + /** + * Check if two nodes are on the same node group (hypervisor) The + * assumption here is: each nodes are leaf nodes. + * + * @param node1 + * one node (can be null) + * @param node2 + * another node (can be null) + * @return true if node1 and node2 are on the same node group; false + * otherwise + * @exception IllegalArgumentException + * when either node1 or node2 is null, or node1 or node2 do + * not belong to the cluster + */ + @Override + public boolean isOnSameNodeGroup(Node node1, Node node2) { + if (node1 == null || node2 == null) { + return false; + } + netlock.readLock().lock(); + try { + return isSameParents(node1, node2); + } finally { + netlock.readLock().unlock(); + } + } + + /** + * Check if network topology is aware of NodeGroup + */ + @Override + public boolean isNodeGroupAware() { + return true; + } + + /** Add a leaf node + * Update node counter & rack counter if necessary + * @param node node to be added; can be null + * @exception IllegalArgumentException if add a node to a leave + * or node to be added is not a leaf + */ + @Override + public void add(Node node) { + if (node==null) return; + if( node instanceof InnerNode ) { + throw new IllegalArgumentException( + "Not allow to add an inner node: "+NodeBase.getPath(node)); + } + netlock.writeLock().lock(); + try { + Node rack = null; + + // if node only with default rack info, here we need to add default + // nodegroup info + if (NetworkTopology.DEFAULT_RACK.equals(node.getNetworkLocation())) { + node.setNetworkLocation(node.getNetworkLocation() + + NetworkTopologyWithNodeGroup.DEFAULT_NODEGROUP); + } + Node nodeGroup = getNode(node.getNetworkLocation()); + if (nodeGroup == null) { + nodeGroup = new InnerNodeWithNodeGroup(node.getNetworkLocation()); + } + rack = getNode(nodeGroup.getNetworkLocation()); + + if (rack != null && !(rack instanceof InnerNode)) { + throw new IllegalArgumentException("Unexpected data node " + + node.toString() + + " at an illegal network location"); + } + if (clusterMap.add(node)) { + LOG.info("Adding a new node: " + NodeBase.getPath(node)); + if (rack == null) { + // We only track rack number here + numOfRacks++; + } + } + if(LOG.isDebugEnabled()) { + LOG.debug("NetworkTopology became:\n" + this.toString()); + } + } finally { + netlock.writeLock().unlock(); + } + } + + /** Remove a node + * Update node counter and rack counter if necessary + * @param node node to be removed; can be null + */ + @Override + public void remove(Node node) { + if (node==null) return; + if( node instanceof InnerNode ) { + throw new IllegalArgumentException( + "Not allow to remove an inner node: "+NodeBase.getPath(node)); + } + LOG.info("Removing a node: "+NodeBase.getPath(node)); + netlock.writeLock().lock(); + try { + if (clusterMap.remove(node)) { + Node nodeGroup = getNode(node.getNetworkLocation()); + if (nodeGroup == null) { + nodeGroup = new InnerNode(node.getNetworkLocation()); + } + InnerNode rack = (InnerNode)getNode(nodeGroup.getNetworkLocation()); + if (rack == null) { + numOfRacks--; + } + } + if(LOG.isDebugEnabled()) { + LOG.debug("NetworkTopology became:\n" + this.toString()); + } + } finally { + netlock.writeLock().unlock(); + } + } + + /** Sort nodes array by their distances to reader + * It linearly scans the array, if a local node is found, swap it with + * the first element of the array. + * If a local node group node is found, swap it with the first element + * following the local node. + * If a local rack node is found, swap it with the first element following + * the local node group node. + * If neither local node, node group node or local rack node is found, put a + * random replica location at position 0. + * It leaves the rest nodes untouched. + * @param reader the node that wishes to read a block from one of the nodes + * @param nodes the list of nodes containing data for the reader + */ + @Override + public void pseudoSortByDistance( Node reader, Node[] nodes ) { + + if (reader != null && !this.contains(reader)) { + // if reader is not a datanode (not in NetworkTopology tree), we will + // replace this reader with a sibling leaf node in tree. + Node nodeGroup = getNode(reader.getNetworkLocation()); + if (nodeGroup != null && nodeGroup instanceof InnerNode) { + InnerNode parentNode = (InnerNode) nodeGroup; + // replace reader with the first children of its parent in tree + reader = parentNode.getLeaf(0, null); + } else { + return; + } + } + int tempIndex = 0; + int localRackNode = -1; + int localNodeGroupNode = -1; + if (reader != null) { + //scan the array to find the local node & local rack node + for (int i = 0; i < nodes.length; i++) { + if (tempIndex == 0 && reader == nodes[i]) { //local node + //swap the local node and the node at position 0 + if (i != 0) { + swap(nodes, tempIndex, i); + } + tempIndex=1; + + if (localRackNode != -1 && (localNodeGroupNode !=-1)) { + if (localRackNode == 0) { + localRackNode = i; + } + if (localNodeGroupNode == 0) { + localNodeGroupNode = i; + } + break; + } + } else if (localNodeGroupNode == -1 && isOnSameNodeGroup(reader, + nodes[i])) { + //local node group + localNodeGroupNode = i; + // node local and rack local are already found + if(tempIndex != 0 && localRackNode != -1) break; + } else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) { + localRackNode = i; + if (tempIndex != 0 && localNodeGroupNode != -1) break; + } + } + + // swap the local nodegroup node and the node at position tempIndex + if(localNodeGroupNode != -1 && localNodeGroupNode != tempIndex) { + swap(nodes, tempIndex, localNodeGroupNode); + if (localRackNode == tempIndex) { + localRackNode = localNodeGroupNode; + } + tempIndex++; + } + + // swap the local rack node and the node at position tempIndex + if(localRackNode != -1 && localRackNode != tempIndex) { + swap(nodes, tempIndex, localRackNode); + tempIndex++; + } + } + + // put a random node at position 0 if there is not a local/local-nodegroup/ + // local-rack node + if (tempIndex == 0 && localNodeGroupNode == -1 && localRackNode == -1 + && nodes.length != 0) { + swap(nodes, 0, r.nextInt(nodes.length)); + } + } + + /** InnerNodeWithNodeGroup represents a switch/router of a data center, rack + * or physical host. Different from a leaf node, it has non-null children. + */ + static class InnerNodeWithNodeGroup extends InnerNode { + public InnerNodeWithNodeGroup(String name, String location, + InnerNode parent, int level) { + super(name, location, parent, level); + } + + public InnerNodeWithNodeGroup(String name, String location) { + super(name, location); + } + + public InnerNodeWithNodeGroup(String path) { + super(path); + } + + @Override + boolean isRack() { + // it is node group + if (getChildren().isEmpty()) { + return false; + } + + Node firstChild = children.get(0); + + if (firstChild instanceof InnerNode) { + Node firstGrandChild = (((InnerNode) firstChild).children).get(0); + if (firstGrandChild instanceof InnerNode) { + // it is datacenter + return false; + } else { + return true; + } + } + return false; + } + + /** + * Judge if this node represents a node group + * + * @return true if it has no child or its children are not InnerNodes + */ + boolean isNodeGroup() { + if (children.isEmpty()) { + return true; + } + Node firstChild = children.get(0); + if (firstChild instanceof InnerNode) { + // it is rack or datacenter + return false; + } + return true; + } + + @Override + protected InnerNode createParentNode(String parentName) { + return new InnerNodeWithNodeGroup(parentName, getPath(this), this, + this.getLevel() + 1); + } + + @Override + protected boolean areChildrenLeaves() { + return isNodeGroup(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 1e72e362e72..c968ff2be62 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -599,10 +599,9 @@ - - + - net.topology.node.switch.mapping.impl + net.topology.node.switch.mapping.impl org.apache.hadoop.net.ScriptBasedMapping The default implementation of the DNSToSwitchMapping. It invokes a script specified in net.topology.script.file.name to resolve @@ -611,6 +610,13 @@ + + net.topology.impl + org.apache.hadoop.net.NetworkTopology + The default implementation of NetworkTopology which is classic three layer one. + + + net.topology.script.file.name diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java new file mode 100644 index 00000000000..7dbd33ab957 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.net; + +import java.util.HashMap; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; + +public class TestNetworkTopologyWithNodeGroup extends TestCase { + private final static NetworkTopologyWithNodeGroup cluster = new + NetworkTopologyWithNodeGroup(); + + private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] { + DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/s1"), + DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/s1"), + DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/s2"), + DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/s3"), + DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/s3"), + DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/s4"), + DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/s5"), + DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/s6") + }; + + private final static NodeBase computeNode = new NodeBase("/d1/r1/s1/h9"); + + static { + for(int i=0; i pickNodesAtRandom(int numNodes, + String excludedScope) { + Map frequency = new HashMap(); + for (DatanodeDescriptor dnd : dataNodes) { + frequency.put(dnd, 0); + } + + for (int j = 0; j < numNodes; j++) { + Node random = cluster.chooseRandom(excludedScope); + frequency.put(random, frequency.get(random) + 1); + } + return frequency; + } + + /** + * This test checks that chooseRandom works for an excluded node. + */ + public void testChooseRandomExcludedNode() { + String scope = "~" + NodeBase.getPath(dataNodes[0]); + Map frequency = pickNodesAtRandom(100, scope); + + for (Node key : dataNodes) { + // all nodes except the first should be more than zero + assertTrue(frequency.get(key) > 0 || key == dataNodes[0]); + } + } + +}