HDFS-11419. HDFS specific network topology classes with storage type info included. Contributed by Chen Liang.
This commit is contained in:
parent
4e14eaded2
commit
eeca8b0c4e
|
@ -61,7 +61,7 @@ public interface InnerNode extends Node {
|
||||||
*
|
*
|
||||||
* @param leafIndex an indexed leaf of the node
|
* @param leafIndex an indexed leaf of the node
|
||||||
* @param excludedNode an excluded node (can be null)
|
* @param excludedNode an excluded node (can be null)
|
||||||
* @return
|
* @return the leaf node corresponding to the given index.
|
||||||
*/
|
*/
|
||||||
Node getLeaf(int leafIndex, Node excludedNode);
|
Node getLeaf(int leafIndex, Node excludedNode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,9 +25,9 @@ import java.util.Map;
|
||||||
/** InnerNode represents a switch/router of a data center or rack.
|
/** InnerNode represents a switch/router of a data center or rack.
|
||||||
* Different from a leaf node, it has non-null children.
|
* Different from a leaf node, it has non-null children.
|
||||||
*/
|
*/
|
||||||
class InnerNodeImpl extends NodeBase implements InnerNode {
|
public class InnerNodeImpl extends NodeBase implements InnerNode {
|
||||||
static class Factory implements InnerNode.Factory<InnerNodeImpl> {
|
protected static class Factory implements InnerNode.Factory<InnerNodeImpl> {
|
||||||
private Factory() {}
|
protected Factory() {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InnerNodeImpl newInnerNode(String path) {
|
public InnerNodeImpl newInnerNode(String path) {
|
||||||
|
@ -37,18 +37,18 @@ class InnerNodeImpl extends NodeBase implements InnerNode {
|
||||||
|
|
||||||
static final Factory FACTORY = new Factory();
|
static final Factory FACTORY = new Factory();
|
||||||
|
|
||||||
private final List<Node> children = new ArrayList<>();
|
protected final List<Node> children = new ArrayList<>();
|
||||||
private final Map<String, Node> childrenMap = new HashMap<>();
|
protected final Map<String, Node> childrenMap = new HashMap<>();
|
||||||
private int numOfLeaves;
|
protected int numOfLeaves;
|
||||||
|
|
||||||
/** Construct an InnerNode from a path-like string */
|
/** Construct an InnerNode from a path-like string */
|
||||||
InnerNodeImpl(String path) {
|
protected InnerNodeImpl(String path) {
|
||||||
super(path);
|
super(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Construct an InnerNode
|
/** Construct an InnerNode
|
||||||
* from its name, its network location, its parent, and its level */
|
* from its name, its network location, its parent, and its level */
|
||||||
InnerNodeImpl(String name, String location, InnerNode parent, int level) {
|
protected InnerNodeImpl(String name, String location, InnerNode parent, int level) {
|
||||||
super(name, location, parent, level);
|
super(name, location, parent, level);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ class InnerNodeImpl extends NodeBase implements InnerNode {
|
||||||
* @param n a node
|
* @param n a node
|
||||||
* @return true if this node is an ancestor of <i>n</i>
|
* @return true if this node is an ancestor of <i>n</i>
|
||||||
*/
|
*/
|
||||||
boolean isAncestor(Node n) {
|
protected boolean isAncestor(Node n) {
|
||||||
return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR) ||
|
return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR) ||
|
||||||
(n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR).
|
(n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR).
|
||||||
startsWith(getPath(this)+NodeBase.PATH_SEPARATOR_STR);
|
startsWith(getPath(this)+NodeBase.PATH_SEPARATOR_STR);
|
||||||
|
@ -92,12 +92,12 @@ class InnerNodeImpl extends NodeBase implements InnerNode {
|
||||||
* @param n a node
|
* @param n a node
|
||||||
* @return true if this node is the parent of <i>n</i>
|
* @return true if this node is the parent of <i>n</i>
|
||||||
*/
|
*/
|
||||||
boolean isParent(Node n) {
|
protected boolean isParent(Node n) {
|
||||||
return n.getNetworkLocation().equals(getPath(this));
|
return n.getNetworkLocation().equals(getPath(this));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return a child name of this node who is an ancestor of node <i>n</i> */
|
/* Return a child name of this node who is an ancestor of node <i>n</i> */
|
||||||
private String getNextAncestorName(Node n) {
|
protected String getNextAncestorName(Node n) {
|
||||||
if (!isAncestor(n)) {
|
if (!isAncestor(n)) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
this + "is not an ancestor of " + n);
|
this + "is not an ancestor of " + n);
|
||||||
|
|
|
@ -68,16 +68,32 @@ public class NetworkTopology {
|
||||||
* @return an instance of NetworkTopology
|
* @return an instance of NetworkTopology
|
||||||
*/
|
*/
|
||||||
public static NetworkTopology getInstance(Configuration conf){
|
public static NetworkTopology getInstance(Configuration conf){
|
||||||
return ReflectionUtils.newInstance(
|
return getInstance(conf, InnerNodeImpl.FACTORY);
|
||||||
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
|
||||||
NetworkTopology.class, NetworkTopology.class), conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
InnerNode.Factory factory = InnerNodeImpl.FACTORY;
|
public static NetworkTopology getInstance(Configuration conf,
|
||||||
|
InnerNode.Factory factory) {
|
||||||
|
NetworkTopology nt = ReflectionUtils.newInstance(
|
||||||
|
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
||||||
|
NetworkTopology.class, NetworkTopology.class), conf);
|
||||||
|
return nt.init(factory);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected NetworkTopology init(InnerNode.Factory factory) {
|
||||||
|
if (!factory.equals(this.factory)) {
|
||||||
|
// the constructor has initialized the factory to default. So only init
|
||||||
|
// again if another factory is specified.
|
||||||
|
this.factory = factory;
|
||||||
|
this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
InnerNode.Factory factory;
|
||||||
/**
|
/**
|
||||||
* the root cluster map
|
* the root cluster map
|
||||||
*/
|
*/
|
||||||
InnerNode clusterMap = factory.newInnerNode(NodeBase.ROOT);
|
InnerNode clusterMap;
|
||||||
/** Depth of all leaf nodes */
|
/** Depth of all leaf nodes */
|
||||||
private int depthOfAllLeaves = -1;
|
private int depthOfAllLeaves = -1;
|
||||||
/** rack counter */
|
/** rack counter */
|
||||||
|
@ -92,7 +108,10 @@ public class NetworkTopology {
|
||||||
/** the lock used to manage access */
|
/** the lock used to manage access */
|
||||||
protected ReadWriteLock netlock = new ReentrantReadWriteLock();
|
protected ReadWriteLock netlock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
|
// keeping the constructor because other components like MR still uses this.
|
||||||
public NetworkTopology() {
|
public NetworkTopology() {
|
||||||
|
this.factory = InnerNodeImpl.FACTORY;
|
||||||
|
this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Add a leaf node
|
/** Add a leaf node
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.math3.stat.inference.ChiSquareTest;
|
import org.apache.commons.math3.stat.inference.ChiSquareTest;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -80,7 +81,7 @@ public class TestClusterTopology extends Assert {
|
||||||
@Test
|
@Test
|
||||||
public void testCountNumNodes() throws Exception {
|
public void testCountNumNodes() throws Exception {
|
||||||
// create the topology
|
// create the topology
|
||||||
NetworkTopology cluster = new NetworkTopology();
|
NetworkTopology cluster = NetworkTopology.getInstance(new Configuration());
|
||||||
NodeElement node1 = getNewNode("node1", "/d1/r1");
|
NodeElement node1 = getNewNode("node1", "/d1/r1");
|
||||||
cluster.add(node1);
|
cluster.add(node1);
|
||||||
NodeElement node2 = getNewNode("node2", "/d1/r2");
|
NodeElement node2 = getNewNode("node2", "/d1/r2");
|
||||||
|
@ -128,7 +129,7 @@ public class TestClusterTopology extends Assert {
|
||||||
@Test
|
@Test
|
||||||
public void testChooseRandom() {
|
public void testChooseRandom() {
|
||||||
// create the topology
|
// create the topology
|
||||||
NetworkTopology cluster = new NetworkTopology();
|
NetworkTopology cluster = NetworkTopology.getInstance(new Configuration());
|
||||||
NodeElement node1 = getNewNode("node1", "/d1/r1");
|
NodeElement node1 = getNewNode("node1", "/d1/r1");
|
||||||
cluster.add(node1);
|
cluster.add(node1);
|
||||||
NodeElement node2 = getNewNode("node2", "/d1/r2");
|
NodeElement node2 = getNewNode("node2", "/d1/r2");
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -300,6 +301,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public EnumSet<StorageType> getStorageTypes() {
|
||||||
|
EnumSet<StorageType> storageTypes = EnumSet.noneOf(StorageType.class);
|
||||||
|
for (DatanodeStorageInfo dsi : getStorageInfos()) {
|
||||||
|
storageTypes.add(dsi.getStorageType());
|
||||||
|
}
|
||||||
|
return storageTypes;
|
||||||
|
}
|
||||||
|
|
||||||
public StorageReport[] getStorageReports() {
|
public StorageReport[] getStorageReports() {
|
||||||
final DatanodeStorageInfo[] infos = getStorageInfos();
|
final DatanodeStorageInfo[] infos = getStorageInfos();
|
||||||
final StorageReport[] reports = new StorageReport[infos.length];
|
final StorageReport[] reports = new StorageReport[infos.length];
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The HDFS specific network topology class. The main purpose of doing this
|
||||||
|
* subclassing is to add storage-type-aware chooseRandom method. All the
|
||||||
|
* remaining parts should be the same.
|
||||||
|
*
|
||||||
|
* Currently a placeholder to test storage type info.
|
||||||
|
* TODO : add "chooseRandom with storageType info" function.
|
||||||
|
*/
|
||||||
|
public class DFSNetworkTopology extends NetworkTopology {
|
||||||
|
public static DFSNetworkTopology getInstance(Configuration conf) {
|
||||||
|
DFSNetworkTopology nt = new DFSNetworkTopology();
|
||||||
|
return (DFSNetworkTopology)nt.init(DFSTopologyNodeImpl.FACTORY);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,253 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.net.InnerNode;
|
||||||
|
import org.apache.hadoop.net.InnerNodeImpl;
|
||||||
|
import org.apache.hadoop.net.Node;
|
||||||
|
|
||||||
|
import java.util.EnumMap;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The HDFS-specific representation of a network topology inner node. The
|
||||||
|
* difference is this class includes the information about the storage type
|
||||||
|
* info of this subtree. This info will be used when selecting subtrees
|
||||||
|
* in block placement.
|
||||||
|
*/
|
||||||
|
public class DFSTopologyNodeImpl extends InnerNodeImpl {
|
||||||
|
|
||||||
|
static final InnerNodeImpl.Factory FACTORY
|
||||||
|
= new DFSTopologyNodeImpl.Factory();
|
||||||
|
|
||||||
|
static final class Factory extends InnerNodeImpl.Factory {
|
||||||
|
private Factory() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InnerNodeImpl newInnerNode(String path) {
|
||||||
|
return new DFSTopologyNodeImpl(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The core data structure of this class. The information about what storage
|
||||||
|
* types this subtree has. Basically, a map whose key is a child
|
||||||
|
* id, value is a enum map including the counts of each storage type. e.g.
|
||||||
|
* DISK type has count 5 means there are 5 leaf datanodes with DISK type
|
||||||
|
* available. This value is set/updated upon datanode joining and leaving.
|
||||||
|
*
|
||||||
|
* NOTE : It might be sufficient to keep only a map from storage type
|
||||||
|
* to count, omitting the child node id. But this might make it hard to keep
|
||||||
|
* consistency when there are updates from children.
|
||||||
|
*
|
||||||
|
* For example, if currently R has two children A and B with storage X, Y, and
|
||||||
|
* A : X=1 Y=1
|
||||||
|
* B : X=2 Y=2
|
||||||
|
* so we store X=3 Y=3 as total on R.
|
||||||
|
*
|
||||||
|
* Now say A has a new X plugged in and becomes X=2 Y=1.
|
||||||
|
*
|
||||||
|
* If we know that "A adds one X", it is easy to update R by +1 on X. However,
|
||||||
|
* if we don't know "A adds one X", but instead got "A now has X=2 Y=1",
|
||||||
|
* (which seems to be the case in current heartbeat) we will not know how to
|
||||||
|
* update R. While if we store on R "A has X=1 and Y=1" then we can simply
|
||||||
|
* update R by completely replacing the A entry and all will be good.
|
||||||
|
*/
|
||||||
|
private final HashMap
|
||||||
|
<String, EnumMap<StorageType, Integer>> childrenStorageInfo;
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl(String path) {
|
||||||
|
super(path);
|
||||||
|
childrenStorageInfo = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl(
|
||||||
|
String name, String location, InnerNode parent, int level) {
|
||||||
|
super(name, location, parent, level);
|
||||||
|
childrenStorageInfo = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
int getNumOfChildren() {
|
||||||
|
return children.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(Node n) {
|
||||||
|
if (!isAncestor(n)) {
|
||||||
|
throw new IllegalArgumentException(n.getName()
|
||||||
|
+ ", which is located at " + n.getNetworkLocation()
|
||||||
|
+ ", is not a descendant of " + getPath(this));
|
||||||
|
}
|
||||||
|
// In HDFS topology, the leaf node should always be DatanodeDescriptor
|
||||||
|
if (!(n instanceof DatanodeDescriptor)) {
|
||||||
|
throw new IllegalArgumentException("Unexpected node type "
|
||||||
|
+ n.getClass().getName());
|
||||||
|
}
|
||||||
|
DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
|
||||||
|
if (isParent(n)) {
|
||||||
|
// this node is the parent of n; add n directly
|
||||||
|
n.setParent(this);
|
||||||
|
n.setLevel(this.level + 1);
|
||||||
|
Node prev = childrenMap.put(n.getName(), n);
|
||||||
|
if (prev != null) {
|
||||||
|
for(int i=0; i<children.size(); i++) {
|
||||||
|
if (children.get(i).getName().equals(n.getName())) {
|
||||||
|
children.set(i, n);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
children.add(n);
|
||||||
|
numOfLeaves++;
|
||||||
|
synchronized (childrenStorageInfo) {
|
||||||
|
if (!childrenStorageInfo.containsKey(dnDescriptor.getName())) {
|
||||||
|
childrenStorageInfo.put(
|
||||||
|
dnDescriptor.getName(), new EnumMap<>(StorageType.class));
|
||||||
|
}
|
||||||
|
for (StorageType st : dnDescriptor.getStorageTypes()) {
|
||||||
|
childrenStorageInfo.get(dnDescriptor.getName()).put(st, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// find the next ancestor node
|
||||||
|
String parentName = getNextAncestorName(n);
|
||||||
|
InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
|
||||||
|
if (parentNode == null) {
|
||||||
|
// create a new InnerNode
|
||||||
|
parentNode = createParentNode(parentName);
|
||||||
|
children.add(parentNode);
|
||||||
|
childrenMap.put(parentNode.getName(), parentNode);
|
||||||
|
}
|
||||||
|
// add n to the subtree of the next ancestor node
|
||||||
|
if (parentNode.add(n)) {
|
||||||
|
numOfLeaves++;
|
||||||
|
synchronized (childrenStorageInfo) {
|
||||||
|
if (!childrenStorageInfo.containsKey(parentNode.getName())) {
|
||||||
|
childrenStorageInfo.put(
|
||||||
|
parentNode.getName(), new EnumMap<>(StorageType.class));
|
||||||
|
for (StorageType st : dnDescriptor.getStorageTypes()) {
|
||||||
|
childrenStorageInfo.get(parentNode.getName()).put(st, 1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
EnumMap<StorageType, Integer> currentCount =
|
||||||
|
childrenStorageInfo.get(parentNode.getName());
|
||||||
|
for (StorageType st : dnDescriptor.getStorageTypes()) {
|
||||||
|
if (currentCount.containsKey(st)) {
|
||||||
|
currentCount.put(st, currentCount.get(st) + 1);
|
||||||
|
} else {
|
||||||
|
currentCount.put(st, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
HashMap <String, EnumMap<StorageType, Integer>> getChildrenStorageInfo() {
|
||||||
|
return childrenStorageInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private DFSTopologyNodeImpl createParentNode(String parentName) {
|
||||||
|
return new DFSTopologyNodeImpl(
|
||||||
|
parentName, getPath(this), this, this.getLevel() + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean remove(Node n) {
|
||||||
|
if (!isAncestor(n)) {
|
||||||
|
throw new IllegalArgumentException(n.getName()
|
||||||
|
+ ", which is located at " + n.getNetworkLocation()
|
||||||
|
+ ", is not a descendant of " + getPath(this));
|
||||||
|
}
|
||||||
|
// In HDFS topology, the leaf node should always be DatanodeDescriptor
|
||||||
|
if (!(n instanceof DatanodeDescriptor)) {
|
||||||
|
throw new IllegalArgumentException("Unexpected node type "
|
||||||
|
+ n.getClass().getName());
|
||||||
|
}
|
||||||
|
DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
|
||||||
|
if (isParent(n)) {
|
||||||
|
// this node is the parent of n; remove n directly
|
||||||
|
if (childrenMap.containsKey(n.getName())) {
|
||||||
|
for (int i=0; i<children.size(); i++) {
|
||||||
|
if (children.get(i).getName().equals(n.getName())) {
|
||||||
|
children.remove(i);
|
||||||
|
childrenMap.remove(n.getName());
|
||||||
|
synchronized (childrenStorageInfo) {
|
||||||
|
childrenStorageInfo.remove(dnDescriptor.getName());
|
||||||
|
}
|
||||||
|
numOfLeaves--;
|
||||||
|
n.setParent(null);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
// find the next ancestor node: the parent node
|
||||||
|
String parentName = getNextAncestorName(n);
|
||||||
|
DFSTopologyNodeImpl parentNode =
|
||||||
|
(DFSTopologyNodeImpl)childrenMap.get(parentName);
|
||||||
|
if (parentNode == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// remove n from the parent node
|
||||||
|
boolean isRemoved = parentNode.remove(n);
|
||||||
|
if (isRemoved) {
|
||||||
|
// if the parent node has no children, remove the parent node too
|
||||||
|
synchronized (childrenStorageInfo) {
|
||||||
|
EnumMap<StorageType, Integer> currentCount =
|
||||||
|
childrenStorageInfo.get(parentNode.getName());
|
||||||
|
EnumSet<StorageType> toRemove = EnumSet.noneOf(StorageType.class);
|
||||||
|
for (StorageType st : dnDescriptor.getStorageTypes()) {
|
||||||
|
int newCount = currentCount.get(st) - 1;
|
||||||
|
if (newCount == 0) {
|
||||||
|
toRemove.add(st);
|
||||||
|
}
|
||||||
|
currentCount.put(st, newCount);
|
||||||
|
}
|
||||||
|
for (StorageType st : toRemove) {
|
||||||
|
currentCount.remove(st);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (parentNode.getNumOfChildren() == 0) {
|
||||||
|
for(int i=0; i < children.size(); i++) {
|
||||||
|
if (children.get(i).getName().equals(parentName)) {
|
||||||
|
children.remove(i);
|
||||||
|
childrenMap.remove(parentName);
|
||||||
|
childrenStorageInfo.remove(parentNode.getName());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numOfLeaves--;
|
||||||
|
}
|
||||||
|
return isRemoved;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,260 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
|
import java.util.EnumMap;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class tests the correctness of storage type info stored in
|
||||||
|
* DFSNetworkTopology.
|
||||||
|
*/
|
||||||
|
public class TestDFSNetworkTopology {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestDFSNetworkTopology.class);
|
||||||
|
private final static DFSNetworkTopology CLUSTER =
|
||||||
|
DFSNetworkTopology.getInstance(new Configuration());
|
||||||
|
private DatanodeDescriptor[] dataNodes;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout testTimeout = new Timeout(30000);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupDatanodes() {
|
||||||
|
final String[] racks = {
|
||||||
|
"/l1/d1/r1", "/l1/d1/r1", "/l1/d1/r2", "/l1/d1/r2", "/l1/d1/r2",
|
||||||
|
|
||||||
|
"/l1/d2/r3", "/l1/d2/r3", "/l1/d2/r3",
|
||||||
|
|
||||||
|
"/l2/d3/r1", "/l2/d3/r2", "/l2/d3/r3", "/l2/d3/r4", "/l2/d3/r5",
|
||||||
|
|
||||||
|
"/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1",
|
||||||
|
"/l2/d4/r1", "/l2/d4/r1"};
|
||||||
|
final String[] hosts = {
|
||||||
|
"host1", "host2", "host3", "host4", "host5",
|
||||||
|
"host6", "host7", "host8", "host9", "host10",
|
||||||
|
"host11", "host12", "host13", "host14", "host15",
|
||||||
|
"host16", "host17", "host18", "host19", "host20"};
|
||||||
|
final StorageType[] types = {
|
||||||
|
StorageType.ARCHIVE, StorageType.DISK, StorageType.ARCHIVE,
|
||||||
|
StorageType.DISK, StorageType.DISK,
|
||||||
|
|
||||||
|
StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD,
|
||||||
|
|
||||||
|
StorageType.DISK, StorageType.RAM_DISK, StorageType.DISK,
|
||||||
|
StorageType.ARCHIVE, StorageType.ARCHIVE,
|
||||||
|
|
||||||
|
StorageType.DISK, StorageType.DISK, StorageType.RAM_DISK,
|
||||||
|
StorageType.RAM_DISK, StorageType.ARCHIVE, StorageType.ARCHIVE,
|
||||||
|
StorageType.SSD};
|
||||||
|
final DatanodeStorageInfo[] storages =
|
||||||
|
DFSTestUtil.createDatanodeStorageInfos(20, racks, hosts, types);
|
||||||
|
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
|
||||||
|
for (int i = 0; i < dataNodes.length; i++) {
|
||||||
|
CLUSTER.add(dataNodes[i]);
|
||||||
|
}
|
||||||
|
dataNodes[9].setDecommissioned();
|
||||||
|
dataNodes[10].setDecommissioned();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test getting the storage type info of subtree.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetStorageTypeInfo() throws Exception {
|
||||||
|
// checking level = 2 nodes
|
||||||
|
DFSTopologyNodeImpl d1 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> d1info =
|
||||||
|
d1.getChildrenStorageInfo();
|
||||||
|
assertEquals(2, d1info.keySet().size());
|
||||||
|
assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2);
|
||||||
|
assertEquals(1, (int)d1info.get("r1").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl d2 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d2");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> d2info =
|
||||||
|
d2.getChildrenStorageInfo();
|
||||||
|
assertEquals(1, d2info.keySet().size());
|
||||||
|
assertTrue(d2info.get("r3").size() == 3);
|
||||||
|
assertEquals(1, (int)d2info.get("r3").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)d2info.get("r3").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(1, (int)d2info.get("r3").get(StorageType.SSD));
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl d3 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d3");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> d3info =
|
||||||
|
d3.getChildrenStorageInfo();
|
||||||
|
assertEquals(5, d3info.keySet().size());
|
||||||
|
assertEquals(1, (int)d3info.get("r1").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)d3info.get("r2").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(1, (int)d3info.get("r3").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)d3info.get("r4").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(1, (int)d3info.get("r5").get(StorageType.ARCHIVE));
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl d4 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d4");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> d4info =
|
||||||
|
d4.getChildrenStorageInfo();
|
||||||
|
assertEquals(1, d4info.keySet().size());
|
||||||
|
assertEquals(2, (int)d4info.get("r1").get(StorageType.DISK));
|
||||||
|
assertEquals(2, (int)d4info.get("r1").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(2, (int)d4info.get("r1").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(1, (int)d4info.get("r1").get(StorageType.SSD));
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl l1 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> l1info =
|
||||||
|
l1.getChildrenStorageInfo();
|
||||||
|
assertEquals(2, l1info.keySet().size());
|
||||||
|
assertTrue(l1info.get("d1").size() == 2
|
||||||
|
&& l1info.get("d2").size() == 3);
|
||||||
|
assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(3, (int)l1info.get("d1").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
|
||||||
|
|
||||||
|
// checking level = 1 nodes
|
||||||
|
DFSTopologyNodeImpl l2 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l2");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> l2info =
|
||||||
|
l2.getChildrenStorageInfo();
|
||||||
|
assertTrue(l2info.get("d3").size() == 3
|
||||||
|
&& l2info.get("d4").size() == 4);
|
||||||
|
assertEquals(2, l2info.keySet().size());
|
||||||
|
assertEquals(2, (int)l2info.get("d3").get(StorageType.DISK));
|
||||||
|
assertEquals(2, (int)l2info.get("d3").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(1, (int)l2info.get("d3").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(2, (int)l2info.get("d4").get(StorageType.DISK));
|
||||||
|
assertEquals(2, (int)l2info.get("d4").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(2, (int)l2info.get("d4").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(1, (int)l2info.get("d4").get(StorageType.SSD));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the correctness of storage type info when nodes are added and removed.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAddAndRemoveTopology() throws Exception {
|
||||||
|
String[] newRack = {"/l1/d1/r1", "/l1/d1/r3", "/l1/d3/r3", "/l1/d3/r3"};
|
||||||
|
String[] newHost = {"nhost1", "nhost2", "nhost3", "nhost4"};
|
||||||
|
String[] newips = {"30.30.30.30", "31.31.31.31", "32.32.32.32",
|
||||||
|
"33.33.33.33"};
|
||||||
|
StorageType[] newTypes = {StorageType.DISK, StorageType.SSD,
|
||||||
|
StorageType.SSD, StorageType.SSD};
|
||||||
|
DatanodeDescriptor[] newDD = new DatanodeDescriptor[4];
|
||||||
|
|
||||||
|
for (int i = 0; i<4; i++) {
|
||||||
|
DatanodeStorageInfo dsi = DFSTestUtil.createDatanodeStorageInfo(
|
||||||
|
"s" + newHost[i], newips[i], newRack[i], newHost[i],
|
||||||
|
newTypes[i], null);
|
||||||
|
newDD[i] = dsi.getDatanodeDescriptor();
|
||||||
|
CLUSTER.add(newDD[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl d1 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> d1info =
|
||||||
|
d1.getChildrenStorageInfo();
|
||||||
|
assertEquals(3, d1info.keySet().size());
|
||||||
|
assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2
|
||||||
|
&& d1info.get("r3").size() == 1);
|
||||||
|
assertEquals(2, (int)d1info.get("r1").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(1, (int)d1info.get("r3").get(StorageType.SSD));
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl d3 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d3");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> d3info =
|
||||||
|
d3.getChildrenStorageInfo();
|
||||||
|
assertEquals(1, d3info.keySet().size());
|
||||||
|
assertTrue(d3info.get("r3").size() == 1);
|
||||||
|
assertEquals(2, (int)d3info.get("r3").get(StorageType.SSD));
|
||||||
|
|
||||||
|
DFSTopologyNodeImpl l1 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> l1info =
|
||||||
|
l1.getChildrenStorageInfo();
|
||||||
|
assertEquals(3, l1info.keySet().size());
|
||||||
|
assertTrue(l1info.get("d1").size() == 3 &&
|
||||||
|
l1info.get("d2").size() == 3 && l1info.get("d3").size() == 1);
|
||||||
|
assertEquals(4, (int)l1info.get("d1").get(StorageType.DISK));
|
||||||
|
assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(1, (int)l1info.get("d1").get(StorageType.SSD));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
|
||||||
|
assertEquals(2, (int)l1info.get("d3").get(StorageType.SSD));
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i<4; i++) {
|
||||||
|
CLUSTER.remove(newDD[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// /d1/r3 should've been out, /d1/r1 should've been resumed
|
||||||
|
DFSTopologyNodeImpl nd1 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> nd1info =
|
||||||
|
nd1.getChildrenStorageInfo();
|
||||||
|
assertEquals(2, nd1info.keySet().size());
|
||||||
|
assertTrue(nd1info.get("r1").size() == 2 && nd1info.get("r2").size() == 2);
|
||||||
|
assertEquals(1, (int)nd1info.get("r1").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)nd1info.get("r1").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(2, (int)nd1info.get("r2").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)nd1info.get("r2").get(StorageType.ARCHIVE));
|
||||||
|
|
||||||
|
// /l1/d3 should've been out, and /l1/d1 should've been resumed
|
||||||
|
DFSTopologyNodeImpl nl1 =
|
||||||
|
(DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
|
||||||
|
HashMap<String, EnumMap<StorageType, Integer>> nl1info =
|
||||||
|
nl1.getChildrenStorageInfo();
|
||||||
|
assertEquals(2, nl1info.keySet().size());
|
||||||
|
assertTrue(l1info.get("d1").size() == 2
|
||||||
|
&& l1info.get("d2").size() == 3);
|
||||||
|
assertEquals(2, (int)nl1info.get("d1").get(StorageType.ARCHIVE));
|
||||||
|
assertEquals(3, (int)nl1info.get("d1").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
|
||||||
|
assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
|
||||||
|
|
||||||
|
assertNull(CLUSTER.getNode("/l1/d3"));
|
||||||
|
}
|
||||||
|
}
|
|
@ -47,7 +47,8 @@ import org.junit.rules.Timeout;
|
||||||
|
|
||||||
public class TestNetworkTopology {
|
public class TestNetworkTopology {
|
||||||
private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
|
private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
|
||||||
private final static NetworkTopology cluster = new NetworkTopology();
|
private final static NetworkTopology cluster =
|
||||||
|
NetworkTopology.getInstance(new Configuration());
|
||||||
private DatanodeDescriptor dataNodes[];
|
private DatanodeDescriptor dataNodes[];
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
|
@ -101,7 +102,8 @@ public class TestNetworkTopology {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateInvalidTopology() throws Exception {
|
public void testCreateInvalidTopology() throws Exception {
|
||||||
NetworkTopology invalCluster = new NetworkTopology();
|
NetworkTopology invalCluster =
|
||||||
|
NetworkTopology.getInstance(new Configuration());
|
||||||
DatanodeDescriptor invalDataNodes[] = new DatanodeDescriptor[] {
|
DatanodeDescriptor invalDataNodes[] = new DatanodeDescriptor[] {
|
||||||
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
|
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
|
||||||
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),
|
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),
|
||||||
|
|
Loading…
Reference in New Issue