HDFS-9420. Add DataModels for DiskBalancer. Contributed by Anu Engineer
This commit is contained in:
parent
0b9edf6e0f
commit
91a5c48143
|
@ -0,0 +1,5 @@
|
||||||
|
HDFS-1312 Change Log
|
||||||
|
|
||||||
|
NEW FEATURES
|
||||||
|
|
||||||
|
HDFS-9420. Add DataModels for DiskBalancer. (Anu Engineer via szetszwo)
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClusterConnector interface hides all specifics about how we communicate to
|
||||||
|
* the HDFS cluster. This interface returns data in classes that diskbalancer
|
||||||
|
* understands.
|
||||||
|
*/
|
||||||
|
public interface ClusterConnector {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getNodes function returns a list of DiskBalancerDataNodes.
|
||||||
|
*
|
||||||
|
* @return Array of DiskBalancerDataNodes
|
||||||
|
*/
|
||||||
|
List<DiskBalancerDataNode> getNodes() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns info about the connector.
|
||||||
|
*
|
||||||
|
* @return String.
|
||||||
|
*/
|
||||||
|
String getConnectorInfo();
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connectors package is a set of logical connectors that connect
|
||||||
|
* to various data sources to read the hadoop cluster information.
|
||||||
|
*
|
||||||
|
* We currently have 1 connector in this package. it is
|
||||||
|
*
|
||||||
|
* NullConnector - This is an in-memory connector that is useful in testing.
|
||||||
|
* we can crate dataNodes on the fly and attach to this connector and
|
||||||
|
* ask the diskBalancer Cluster to read data from this source.
|
||||||
|
*/
|
|
@ -0,0 +1,249 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
||||||
|
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||||
|
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DiskBalancerCluster represents the nodes that we are working against.
|
||||||
|
* <p/>
|
||||||
|
* Please Note :
|
||||||
|
* <p/>
|
||||||
|
* Semantics of inclusionList and exclusionLists.
|
||||||
|
* <p/>
|
||||||
|
* If a non-empty inclusionList is specified then the diskBalancer assumes that
|
||||||
|
* the user is only interested in processing that list of nodes. This node list
|
||||||
|
* is checked against the exclusionList and only the nodes in inclusionList but
|
||||||
|
* not in exclusionList is processed.
|
||||||
|
* <p/>
|
||||||
|
* if inclusionList is empty, then we assume that all live nodes in the nodes is
|
||||||
|
* to be processed by diskBalancer. In that case diskBalancer will avoid any
|
||||||
|
* nodes specified in the exclusionList but will process all nodes in the
|
||||||
|
* cluster.
|
||||||
|
* <p/>
|
||||||
|
* In other words, an empty inclusionList is means all the nodes otherwise
|
||||||
|
* only a given list is processed and ExclusionList is always honored.
|
||||||
|
*/
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public class DiskBalancerCluster {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(DiskBalancerCluster.class);
|
||||||
|
private final Set<String> exclusionList;
|
||||||
|
private final Set<String> inclusionList;
|
||||||
|
private ClusterConnector clusterConnector;
|
||||||
|
private List<DiskBalancerDataNode> nodes;
|
||||||
|
private String outputpath;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
private List<DiskBalancerDataNode> nodesToProcess;
|
||||||
|
private float threshold;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty Constructor needed by Jackson.
|
||||||
|
*/
|
||||||
|
public DiskBalancerCluster() {
|
||||||
|
nodes = new LinkedList<>();
|
||||||
|
exclusionList = new TreeSet<>();
|
||||||
|
inclusionList = new TreeSet<>();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a DiskBalancerCluster.
|
||||||
|
*
|
||||||
|
* @param connector - ClusterConnector
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public DiskBalancerCluster(ClusterConnector connector) throws IOException {
|
||||||
|
Preconditions.checkNotNull(connector);
|
||||||
|
clusterConnector = connector;
|
||||||
|
exclusionList = new TreeSet<>();
|
||||||
|
inclusionList = new TreeSet<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parses a Json string and converts to DiskBalancerCluster.
|
||||||
|
*
|
||||||
|
* @param json - Json String
|
||||||
|
* @return DiskBalancerCluster
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static DiskBalancerCluster parseJson(String json) throws IOException {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.readValue(json, DiskBalancerCluster.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* readClusterInfo connects to the cluster and reads the node's data. This
|
||||||
|
* data is used as basis of rest of computation in DiskBalancerCluster
|
||||||
|
*/
|
||||||
|
public void readClusterInfo() throws Exception {
|
||||||
|
Preconditions.checkNotNull(clusterConnector);
|
||||||
|
LOG.info("Using connector : " + clusterConnector.getConnectorInfo());
|
||||||
|
nodes = clusterConnector.getNodes();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets all DataNodes in the Cluster.
|
||||||
|
*
|
||||||
|
* @return Array of DisKBalancerDataNodes
|
||||||
|
*/
|
||||||
|
public List<DiskBalancerDataNode> getNodes() {
|
||||||
|
return nodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the list of nodes of this cluster.
|
||||||
|
*
|
||||||
|
* @param clusterNodes List of Nodes
|
||||||
|
*/
|
||||||
|
public void setNodes(List<DiskBalancerDataNode> clusterNodes) {
|
||||||
|
this.nodes = clusterNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current ExclusionList.
|
||||||
|
*
|
||||||
|
* @return List of Nodes that are excluded from diskBalancer right now.
|
||||||
|
*/
|
||||||
|
public Set<String> getExclusionList() {
|
||||||
|
return exclusionList;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sets the list of nodes to exclude from process of diskBalancer.
|
||||||
|
*
|
||||||
|
* @param excludedNodes - exclusionList of nodes.
|
||||||
|
*/
|
||||||
|
public void setExclusionList(Set<String> excludedNodes) {
|
||||||
|
this.exclusionList.addAll(excludedNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the threshold value. This is used for indicating how much skew is
|
||||||
|
* acceptable, This is expressed as a percentage. For example to say 20% skew
|
||||||
|
* between volumes is acceptable set this value to 20.
|
||||||
|
*
|
||||||
|
* @return float
|
||||||
|
*/
|
||||||
|
public float getThreshold() {
|
||||||
|
return threshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the threshold value.
|
||||||
|
*
|
||||||
|
* @param thresholdPercent - float - in percentage
|
||||||
|
*/
|
||||||
|
public void setThreshold(float thresholdPercent) {
|
||||||
|
Preconditions.checkState((thresholdPercent >= 0.0f) &&
|
||||||
|
(thresholdPercent <= 100.0f), "A percentage value expected.");
|
||||||
|
this.threshold = thresholdPercent;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the Inclusion list.
|
||||||
|
*
|
||||||
|
* @return List of machine to be processed by diskBalancer.
|
||||||
|
*/
|
||||||
|
public Set<String> getInclusionList() {
|
||||||
|
return inclusionList;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the inclusionList.
|
||||||
|
*
|
||||||
|
* @param includeNodes - set of machines to be processed by diskBalancer.
|
||||||
|
*/
|
||||||
|
public void setInclusionList(Set<String> includeNodes) {
|
||||||
|
this.inclusionList.addAll(includeNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns a serialized json string.
|
||||||
|
*
|
||||||
|
* @return String - json
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public String toJson() throws IOException {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.writeValueAsString(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Nodes to Process which is the real list of nodes processed by
|
||||||
|
* diskBalancer.
|
||||||
|
*
|
||||||
|
* @return List of DiskBalancerDataNodes
|
||||||
|
*/
|
||||||
|
@JsonIgnore
|
||||||
|
public List<DiskBalancerDataNode> getNodesToProcess() {
|
||||||
|
return nodesToProcess;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the nodes to process.
|
||||||
|
*
|
||||||
|
* @param dnNodesToProcess - List of DataNodes to process
|
||||||
|
*/
|
||||||
|
@JsonIgnore
|
||||||
|
public void setNodesToProcess(List<DiskBalancerDataNode> dnNodesToProcess) {
|
||||||
|
this.nodesToProcess = dnNodesToProcess;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns th output path for this cluster.
|
||||||
|
*/
|
||||||
|
public String getOutput() {
|
||||||
|
return outputpath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the output path for this run.
|
||||||
|
*
|
||||||
|
* @param output - Path
|
||||||
|
*/
|
||||||
|
public void setOutput(String output) {
|
||||||
|
this.outputpath = output;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a snapshot of the cluster to the specified directory.
|
||||||
|
*
|
||||||
|
* @param snapShotName - name of the snapshot
|
||||||
|
*/
|
||||||
|
public void createSnapshot(String snapShotName) throws IOException {
|
||||||
|
String json = this.toJson();
|
||||||
|
File outFile = new File(getOutput() + "/" + snapShotName);
|
||||||
|
FileUtils.writeStringToFile(outFile, json);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,269 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DiskBalancerDataNode represents a DataNode that exists in the cluster. It
|
||||||
|
* also contains a metric called nodeDataDensity which allows us to compare
|
||||||
|
* between a set of Nodes.
|
||||||
|
*/
|
||||||
|
public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
|
||||||
|
private float nodeDataDensity;
|
||||||
|
private Map<String, DiskBalancerVolumeSet> volumeSets;
|
||||||
|
private String dataNodeUUID;
|
||||||
|
private String dataNodeIP;
|
||||||
|
private int dataNodePort;
|
||||||
|
private String dataNodeName;
|
||||||
|
private int volumeCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs an Empty Data Node.
|
||||||
|
*/
|
||||||
|
public DiskBalancerDataNode() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a DataNode.
|
||||||
|
*
|
||||||
|
* @param dataNodeID - Node ID
|
||||||
|
*/
|
||||||
|
public DiskBalancerDataNode(String dataNodeID) {
|
||||||
|
this.dataNodeUUID = dataNodeID;
|
||||||
|
volumeSets = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the IP address of this Node.
|
||||||
|
*
|
||||||
|
* @return IP Address string
|
||||||
|
*/
|
||||||
|
public String getDataNodeIP() {
|
||||||
|
return dataNodeIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the IP address of this Node.
|
||||||
|
*
|
||||||
|
* @param ipaddress - IP Address
|
||||||
|
*/
|
||||||
|
public void setDataNodeIP(String ipaddress) {
|
||||||
|
this.dataNodeIP = ipaddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Port of this DataNode.
|
||||||
|
*
|
||||||
|
* @return Port Number
|
||||||
|
*/
|
||||||
|
public int getDataNodePort() {
|
||||||
|
return dataNodePort;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the DataNode Port number.
|
||||||
|
*
|
||||||
|
* @param port - Datanode Port Number
|
||||||
|
*/
|
||||||
|
public void setDataNodePort(int port) {
|
||||||
|
this.dataNodePort = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get DataNode DNS name.
|
||||||
|
*
|
||||||
|
* @return name of the node
|
||||||
|
*/
|
||||||
|
public String getDataNodeName() {
|
||||||
|
return dataNodeName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets node's DNS name.
|
||||||
|
*
|
||||||
|
* @param name - Data node name
|
||||||
|
*/
|
||||||
|
public void setDataNodeName(String name) {
|
||||||
|
this.dataNodeName = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Volume sets on this node.
|
||||||
|
*
|
||||||
|
* @return a Map of VolumeSets
|
||||||
|
*/
|
||||||
|
public Map<String, DiskBalancerVolumeSet> getVolumeSets() {
|
||||||
|
return volumeSets;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns datanode ID.
|
||||||
|
**/
|
||||||
|
public String getDataNodeUUID() {
|
||||||
|
return dataNodeUUID;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets Datanode UUID.
|
||||||
|
*
|
||||||
|
* @param nodeID - Node ID.
|
||||||
|
*/
|
||||||
|
public void setDataNodeUUID(String nodeID) {
|
||||||
|
this.dataNodeUUID = nodeID;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates whether some other object is "equal to" this one.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if ((obj == null) || (obj.getClass() != getClass())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
DiskBalancerDataNode that = (DiskBalancerDataNode) obj;
|
||||||
|
return dataNodeUUID.equals(that.getDataNodeUUID());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compares this object with the specified object for order. Returns a
|
||||||
|
* negative integer, zero, or a positive integer as this object is less than,
|
||||||
|
* equal to, or greater than the specified object.
|
||||||
|
*
|
||||||
|
* @param that the object to be compared.
|
||||||
|
* @return a negative integer, zero, or a positive integer as this object is
|
||||||
|
* less than, equal to, or greater than the specified object.
|
||||||
|
* @throws NullPointerException if the specified object is null
|
||||||
|
* @throws ClassCastException if the specified object's type prevents it
|
||||||
|
* from being compared to this object.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int compareTo(DiskBalancerDataNode that) {
|
||||||
|
Preconditions.checkNotNull(that);
|
||||||
|
|
||||||
|
if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
|
||||||
|
< 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
|
||||||
|
== 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
|
||||||
|
> 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a hash code value for the object. This method is supported for the
|
||||||
|
* benefit of hash tables such as those provided by {@link HashMap}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return super.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns NodeDataDensity Metric.
|
||||||
|
*
|
||||||
|
* @return float
|
||||||
|
*/
|
||||||
|
public float getNodeDataDensity() {
|
||||||
|
return nodeDataDensity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* computes nodes data density.
|
||||||
|
* <p/>
|
||||||
|
* This metric allows us to compare different nodes and how well the data is
|
||||||
|
* spread across a set of volumes inside the node.
|
||||||
|
*/
|
||||||
|
public void computeNodeDensity() {
|
||||||
|
float sum = 0;
|
||||||
|
int volcount = 0;
|
||||||
|
for (DiskBalancerVolumeSet vset : volumeSets.values()) {
|
||||||
|
for (DiskBalancerVolume vol : vset.getVolumes()) {
|
||||||
|
sum += Math.abs(vol.getVolumeDataDensity());
|
||||||
|
volcount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodeDataDensity = sum;
|
||||||
|
this.volumeCount = volcount;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes if this node needs balancing at all.
|
||||||
|
*
|
||||||
|
* @param threshold - Percentage
|
||||||
|
* @return true or false
|
||||||
|
*/
|
||||||
|
public boolean isBalancingNeeded(float threshold) {
|
||||||
|
for (DiskBalancerVolumeSet vSet : getVolumeSets().values()) {
|
||||||
|
if (vSet.isBalancingNeeded(threshold)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a volume to the DataNode.
|
||||||
|
* <p/>
|
||||||
|
* it is assumed that we have one thread per node hence this call is not
|
||||||
|
* synchronised neither is the map is protected.
|
||||||
|
*
|
||||||
|
* @param volume - volume
|
||||||
|
*/
|
||||||
|
public void addVolume(DiskBalancerVolume volume) throws Exception {
|
||||||
|
Preconditions.checkNotNull(volume, "volume cannot be null");
|
||||||
|
Preconditions.checkNotNull(volumeSets, "volume sets cannot be null");
|
||||||
|
Preconditions
|
||||||
|
.checkNotNull(volume.getStorageType(), "storage type cannot be null");
|
||||||
|
|
||||||
|
String volumeSetKey = volume.getStorageType();
|
||||||
|
DiskBalancerVolumeSet vSet;
|
||||||
|
if (volumeSets.containsKey(volumeSetKey)) {
|
||||||
|
vSet = volumeSets.get(volumeSetKey);
|
||||||
|
} else {
|
||||||
|
vSet = new DiskBalancerVolumeSet(volume.isTransient());
|
||||||
|
volumeSets.put(volumeSetKey, vSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
vSet.addVolume(volume);
|
||||||
|
computeNodeDensity();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns how many volumes are in the DataNode.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
public int getVolumeCount() {
|
||||||
|
return volumeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,330 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
|
import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DiskBalancerVolume represents a volume in the DataNode.
|
||||||
|
*/
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
public class DiskBalancerVolume {
|
||||||
|
private String path;
|
||||||
|
private long capacity;
|
||||||
|
private String storageType;
|
||||||
|
private long used;
|
||||||
|
private long reserved;
|
||||||
|
private String uuid;
|
||||||
|
private boolean failed;
|
||||||
|
private boolean isTransient;
|
||||||
|
private float volumeDataDensity;
|
||||||
|
private boolean skip = false;
|
||||||
|
private boolean isReadOnly;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs DiskBalancerVolume.
|
||||||
|
*/
|
||||||
|
public DiskBalancerVolume() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parses a Json string and converts to DiskBalancerVolume.
|
||||||
|
*
|
||||||
|
* @param json - Json String
|
||||||
|
*
|
||||||
|
* @return DiskBalancerCluster
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static DiskBalancerVolume parseJson(String json) throws IOException {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.readValue(json, DiskBalancerVolume.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get this volume Data Density
|
||||||
|
* Please see DiskBalancerVolumeSet#computeVolumeDataDensity to see how
|
||||||
|
* this is computed.
|
||||||
|
*
|
||||||
|
* @return float.
|
||||||
|
*/
|
||||||
|
public float getVolumeDataDensity() {
|
||||||
|
return volumeDataDensity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets this volume's data density.
|
||||||
|
*
|
||||||
|
* @param volDataDensity - density
|
||||||
|
*/
|
||||||
|
public void setVolumeDataDensity(float volDataDensity) {
|
||||||
|
this.volumeDataDensity = volDataDensity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates if the volume is Transient in nature.
|
||||||
|
*
|
||||||
|
* @return true or false.
|
||||||
|
*/
|
||||||
|
public boolean isTransient() {
|
||||||
|
return isTransient;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets volumes transient nature.
|
||||||
|
*
|
||||||
|
* @param aTransient - bool
|
||||||
|
*/
|
||||||
|
public void setTransient(boolean aTransient) {
|
||||||
|
this.isTransient = aTransient;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compares two volumes and decides if it is the same volume.
|
||||||
|
*
|
||||||
|
* @param o Volume Object
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
DiskBalancerVolume that = (DiskBalancerVolume) o;
|
||||||
|
return uuid.equals(that.uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes hash code for a diskBalancerVolume.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return uuid.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Capacity of this volume.
|
||||||
|
*
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
public long getCapacity() {
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the capacity of this volume.
|
||||||
|
*
|
||||||
|
* @param totalCapacity long
|
||||||
|
*/
|
||||||
|
public void setCapacity(long totalCapacity) {
|
||||||
|
this.capacity = totalCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates if this is a failed volume.
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public boolean isFailed() {
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the failed flag for this volume.
|
||||||
|
*
|
||||||
|
* @param fail boolean
|
||||||
|
*/
|
||||||
|
public void setFailed(boolean fail) {
|
||||||
|
this.failed = fail;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the path for this volume.
|
||||||
|
*
|
||||||
|
* @return String
|
||||||
|
*/
|
||||||
|
public String getPath() {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the path for this volume.
|
||||||
|
*
|
||||||
|
* @param volPath Path
|
||||||
|
*/
|
||||||
|
public void setPath(String volPath) {
|
||||||
|
this.path = volPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the reserved size for this volume.
|
||||||
|
*
|
||||||
|
* @return Long - Reserved size.
|
||||||
|
*/
|
||||||
|
public long getReserved() {
|
||||||
|
return reserved;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the reserved size.
|
||||||
|
*
|
||||||
|
* @param reservedSize -- Sets the reserved.
|
||||||
|
*/
|
||||||
|
public void setReserved(long reservedSize) {
|
||||||
|
this.reserved = reservedSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the StorageType.
|
||||||
|
*
|
||||||
|
* @return String StorageType.
|
||||||
|
*/
|
||||||
|
public String getStorageType() {
|
||||||
|
return storageType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the StorageType.
|
||||||
|
*
|
||||||
|
* @param typeOfStorage - Storage Type String.
|
||||||
|
*/
|
||||||
|
public void setStorageType(String typeOfStorage) {
|
||||||
|
this.storageType = typeOfStorage;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the dfsUsed Size.
|
||||||
|
*
|
||||||
|
* @return - long - used space
|
||||||
|
*/
|
||||||
|
public long getUsed() {
|
||||||
|
return used;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the used Space for Long.
|
||||||
|
*
|
||||||
|
* @param dfsUsedSpace - dfsUsedSpace for this volume.
|
||||||
|
*/
|
||||||
|
public void setUsed(long dfsUsedSpace) {
|
||||||
|
Preconditions.checkArgument(dfsUsedSpace < this.getCapacity());
|
||||||
|
this.used = dfsUsedSpace;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the uuid for this volume.
|
||||||
|
*
|
||||||
|
* @return String - uuid of th volume
|
||||||
|
*/
|
||||||
|
public String getUuid() {
|
||||||
|
return uuid;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the uuid for this volume.
|
||||||
|
*
|
||||||
|
* @param id - String
|
||||||
|
*/
|
||||||
|
public void setUuid(String id) {
|
||||||
|
this.uuid = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns effective capacity of a volume.
|
||||||
|
*
|
||||||
|
* @return float - fraction that represents used capacity.
|
||||||
|
*/
|
||||||
|
@JsonIgnore
|
||||||
|
public long computeEffectiveCapacity() {
|
||||||
|
return getCapacity() - getReserved();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns a Json String.
|
||||||
|
*
|
||||||
|
* @return String
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public String toJson() throws IOException {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.writeValueAsString(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns if we should skip this volume.
|
||||||
|
* @return true / false
|
||||||
|
*/
|
||||||
|
public boolean isSkip() {
|
||||||
|
return skip;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the Skip value for this volume.
|
||||||
|
* @param skipValue bool
|
||||||
|
*/
|
||||||
|
public void setSkip(boolean skipValue) {
|
||||||
|
this.skip = skipValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the usedPercentage of a disk.
|
||||||
|
* This is useful in debugging disk usage
|
||||||
|
* @return float
|
||||||
|
*/
|
||||||
|
public float computeUsedPercentage() {
|
||||||
|
return (float) (getUsed()) / (float) (getCapacity());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tells us if a volume is transient.
|
||||||
|
* @param transientValue
|
||||||
|
*/
|
||||||
|
public void setIsTransient(boolean transientValue) {
|
||||||
|
this.isTransient = transientValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tells us if this volume is read-only.
|
||||||
|
* @return true / false
|
||||||
|
*/
|
||||||
|
public boolean isReadOnly() {
|
||||||
|
return isReadOnly;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets this volume as read only.
|
||||||
|
* @param readOnly - boolean
|
||||||
|
*/
|
||||||
|
public void setReadOnly(boolean readOnly) {
|
||||||
|
isReadOnly = readOnly;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,325 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
|
import org.apache.htrace.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DiskBalancerVolumeSet is a collection of storage devices on the
|
||||||
|
* data node which are of similar StorageType.
|
||||||
|
*/
|
||||||
|
@JsonIgnoreProperties({"sortedQueue", "volumeCount", "idealUsed"})
|
||||||
|
public class DiskBalancerVolumeSet {
|
||||||
|
static final Log LOG = LogFactory.getLog(DiskBalancerVolumeSet.class);
|
||||||
|
private final int maxDisks = 256;
|
||||||
|
|
||||||
|
@JsonProperty("transient")
|
||||||
|
private boolean isTransient;
|
||||||
|
private Set<DiskBalancerVolume> volumes;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
private TreeSet<DiskBalancerVolume> sortedQueue;
|
||||||
|
private String storageType;
|
||||||
|
private String setID;
|
||||||
|
|
||||||
|
private float idealUsed;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs Empty DiskNBalanceVolumeSet.
|
||||||
|
* This is needed by jackson
|
||||||
|
*/
|
||||||
|
public DiskBalancerVolumeSet() {
|
||||||
|
setID = UUID.randomUUID().toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a DiskBalancerVolumeSet.
|
||||||
|
*
|
||||||
|
* @param isTransient - boolean
|
||||||
|
*/
|
||||||
|
public DiskBalancerVolumeSet(boolean isTransient) {
|
||||||
|
this.isTransient = isTransient;
|
||||||
|
volumes = new HashSet<>(maxDisks);
|
||||||
|
sortedQueue = new TreeSet<>(new MinHeap());
|
||||||
|
this.storageType = null;
|
||||||
|
setID = UUID.randomUUID().toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new DiskBalancerVolumeSet.
|
||||||
|
*/
|
||||||
|
public DiskBalancerVolumeSet(DiskBalancerVolumeSet volumeSet) {
|
||||||
|
this.isTransient = volumeSet.isTransient();
|
||||||
|
this.storageType = volumeSet.storageType;
|
||||||
|
this.volumes = new HashSet<>(volumeSet.volumes);
|
||||||
|
sortedQueue = new TreeSet<>(new MinHeap());
|
||||||
|
setID = UUID.randomUUID().toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tells us if this volumeSet is transient.
|
||||||
|
*
|
||||||
|
* @return - true or false
|
||||||
|
*/
|
||||||
|
@JsonProperty("transient")
|
||||||
|
public boolean isTransient() {
|
||||||
|
return isTransient;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the transient properties for this volumeSet.
|
||||||
|
*
|
||||||
|
* @param transientValue - Boolean
|
||||||
|
*/
|
||||||
|
@JsonProperty("transient")
|
||||||
|
public void setTransient(boolean transientValue) {
|
||||||
|
this.isTransient = transientValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes Volume Data Density. Adding a new volume changes
|
||||||
|
* the volumeDataDensity for all volumes. So we throw away
|
||||||
|
* our priority queue and recompute everything.
|
||||||
|
*
|
||||||
|
* we discard failed volumes from this computation.
|
||||||
|
*
|
||||||
|
* totalCapacity = totalCapacity of this volumeSet
|
||||||
|
* totalUsed = totalDfsUsed for this volumeSet
|
||||||
|
* idealUsed = totalUsed / totalCapacity
|
||||||
|
* dfsUsedRatio = dfsUsedOnAVolume / Capacity On that Volume
|
||||||
|
* volumeDataDensity = idealUsed - dfsUsedRatio
|
||||||
|
*/
|
||||||
|
public void computeVolumeDataDensity() {
|
||||||
|
long totalCapacity = 0;
|
||||||
|
long totalUsed = 0;
|
||||||
|
sortedQueue.clear();
|
||||||
|
|
||||||
|
// when we plan to re-distribute data we need to make
|
||||||
|
// sure that we skip failed volumes.
|
||||||
|
for (DiskBalancerVolume volume : volumes) {
|
||||||
|
if (!volume.isFailed() && !volume.isSkip()) {
|
||||||
|
|
||||||
|
if (volume.computeEffectiveCapacity() < 0) {
|
||||||
|
skipMisConfiguredVolume(volume);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
totalCapacity += volume.computeEffectiveCapacity();
|
||||||
|
totalUsed += volume.getUsed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (totalCapacity != 0) {
|
||||||
|
this.idealUsed = totalUsed / (float) totalCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (DiskBalancerVolume volume : volumes) {
|
||||||
|
if (!volume.isFailed() && !volume.isSkip()) {
|
||||||
|
float dfsUsedRatio =
|
||||||
|
volume.getUsed() / (float) volume.computeEffectiveCapacity();
|
||||||
|
volume.setVolumeDataDensity(this.idealUsed - dfsUsedRatio);
|
||||||
|
sortedQueue.add(volume);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void skipMisConfiguredVolume(DiskBalancerVolume volume) {
|
||||||
|
//probably points to some sort of mis-configuration. Log this and skip
|
||||||
|
// processing this volume.
|
||||||
|
String errMessage = String.format("Real capacity is negative." +
|
||||||
|
"This usually points to some " +
|
||||||
|
"kind of mis-configuration.%n" +
|
||||||
|
"Capacity : %d Reserved : %d " +
|
||||||
|
"realCap = capacity - " +
|
||||||
|
"reserved = %d.%n" +
|
||||||
|
"Skipping this volume from " +
|
||||||
|
"all processing. type : %s id" +
|
||||||
|
" :%s",
|
||||||
|
volume.getCapacity(),
|
||||||
|
volume.getReserved(),
|
||||||
|
volume.computeEffectiveCapacity(),
|
||||||
|
volume.getStorageType(),
|
||||||
|
volume.getUuid());
|
||||||
|
|
||||||
|
LOG.fatal(errMessage);
|
||||||
|
volume.setSkip(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of volumes in the Volume Set.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
@JsonIgnore
|
||||||
|
public int getVolumeCount() {
|
||||||
|
return volumes.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Storage Type.
|
||||||
|
*
|
||||||
|
* @return String
|
||||||
|
*/
|
||||||
|
public String getStorageType() {
|
||||||
|
return storageType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set Storage Type.
|
||||||
|
* @param typeOfStorage -- StorageType
|
||||||
|
*/
|
||||||
|
public void setStorageType(String typeOfStorage) {
|
||||||
|
this.storageType = typeOfStorage;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* adds a given volume into this volume set.
|
||||||
|
*
|
||||||
|
* @param volume - volume to add.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void addVolume(DiskBalancerVolume volume) throws Exception {
|
||||||
|
Preconditions.checkNotNull(volume, "volume cannot be null");
|
||||||
|
Preconditions.checkState(isTransient() == volume.isTransient(),
|
||||||
|
"Mismatch in volumeSet and volume's transient " +
|
||||||
|
"properties.");
|
||||||
|
|
||||||
|
|
||||||
|
if (this.storageType == null) {
|
||||||
|
Preconditions.checkState(volumes.size() == 0L, "Storage Type is Null but"
|
||||||
|
+ " volume size is " + volumes.size());
|
||||||
|
this.storageType = volume.getStorageType();
|
||||||
|
} else {
|
||||||
|
Preconditions.checkState(this.storageType.equals(volume.getStorageType()),
|
||||||
|
"Adding wrong type of disk to this volume set");
|
||||||
|
}
|
||||||
|
volumes.add(volume);
|
||||||
|
computeVolumeDataDensity();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list diskVolumes that are part of this volume set.
|
||||||
|
*
|
||||||
|
* @return List
|
||||||
|
*/
|
||||||
|
public List<DiskBalancerVolume> getVolumes() {
|
||||||
|
return new ArrayList<>(volumes);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public TreeSet<DiskBalancerVolume> getSortedQueue() {
|
||||||
|
return sortedQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes whether we need to do any balancing on this volume Set at all.
|
||||||
|
* It checks if any disks are out of threshold value
|
||||||
|
*
|
||||||
|
* @param thresholdPercentage - threshold - in percentage
|
||||||
|
*
|
||||||
|
* @return true if balancing is needed false otherwise.
|
||||||
|
*/
|
||||||
|
public boolean isBalancingNeeded(float thresholdPercentage) {
|
||||||
|
float threshold = thresholdPercentage / 100.0f;
|
||||||
|
|
||||||
|
if(volumes == null || volumes.size() <= 1) {
|
||||||
|
// there is nothing we can do with a single volume.
|
||||||
|
// so no planning needed.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (DiskBalancerVolume vol : volumes) {
|
||||||
|
boolean notSkip = !vol.isFailed() && !vol.isTransient() && !vol.isSkip();
|
||||||
|
if ((Math.abs(vol.getVolumeDataDensity()) > threshold) && notSkip) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a volume from the current set.
|
||||||
|
*
|
||||||
|
* This call does not recompute the volumeDataDensity. It has to be
|
||||||
|
* done manually after this call.
|
||||||
|
*
|
||||||
|
* @param volume - Volume to remove
|
||||||
|
*/
|
||||||
|
public void removeVolume(DiskBalancerVolume volume) {
|
||||||
|
volumes.remove(volume);
|
||||||
|
sortedQueue.remove(volume);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Volume Set ID.
|
||||||
|
* @return String
|
||||||
|
*/
|
||||||
|
public String getSetID() {
|
||||||
|
return setID;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set VolumeSet ID.
|
||||||
|
* @param volID String
|
||||||
|
*/
|
||||||
|
public void setSetID(String volID) {
|
||||||
|
this.setID = volID;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the idealUsed for this volume set.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public float getIdealUsed() {
|
||||||
|
return this.idealUsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MinHeap implements Comparator<DiskBalancerVolume>, Serializable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compares its two arguments for order. Returns a negative integer,
|
||||||
|
* zero, or a positive integer as the first argument is less than, equal
|
||||||
|
* to, or greater than the second.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int compare(DiskBalancerVolume first, DiskBalancerVolume second) {
|
||||||
|
return Float
|
||||||
|
.compare(second.getVolumeDataDensity(), first.getVolumeDataDensity());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
||||||
|
/**
|
||||||
|
* Disk Balancer Data Model is the Data Model for the cluster that
|
||||||
|
* Disk Balancer is working against. This information is read
|
||||||
|
* directly from NameNode or from a user supplied json model file.
|
||||||
|
*
|
||||||
|
* Here is the overview of the model maintained by diskBalancer.
|
||||||
|
*
|
||||||
|
* DiskBalancerCluster is a list of DiskBalancerDataNodes.
|
||||||
|
* DiskBalancerDataNodes is a collection of DiskBalancerVolumeSets
|
||||||
|
* DiskBalancerVolumeSets is a collection of DiskBalancerVolumes
|
||||||
|
* DiskBalancerVolumes represents actual volumes on DataNodes.
|
||||||
|
*/
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer;
|
||||||
|
/**
|
||||||
|
* Disk Balancer connects to a {@link org.apache.hadoop.hdfs.server.datanode
|
||||||
|
* .DataNode} and attempts to spread data across all volumes evenly.
|
||||||
|
*
|
||||||
|
* This is achieved by :
|
||||||
|
*
|
||||||
|
* 1) Calculating the average data that should be on a set of volumes grouped
|
||||||
|
* by the type. For example, how much data should be on each volume of SSDs on a
|
||||||
|
* machine.
|
||||||
|
*
|
||||||
|
* 2) Once we know the average data that is expected to be on a volume we
|
||||||
|
* move data from volumes with higher than average load to volumes with
|
||||||
|
* less than average load.
|
||||||
|
*
|
||||||
|
* 3) Disk Balancer operates against data nodes which are live and operational.
|
||||||
|
*
|
||||||
|
*/
|
|
@ -0,0 +1,227 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to create various cluster configrations at run time.
|
||||||
|
*/
|
||||||
|
public class DiskBalancerTestUtil {
|
||||||
|
// we modeling disks here, hence HDD style units
|
||||||
|
public static final long GB = 1000000000L;
|
||||||
|
public static final long TB = 1000000000000L;
|
||||||
|
private static int[] diskSizes =
|
||||||
|
{1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200, 300, 400, 500, 600, 700, 800, 900};
|
||||||
|
Random rand;
|
||||||
|
private String stringTable =
|
||||||
|
"ABCDEDFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0987654321";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a util class.
|
||||||
|
*/
|
||||||
|
public DiskBalancerTestUtil() {
|
||||||
|
this.rand = new Random(Time.monotonicNow());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a random string.
|
||||||
|
*
|
||||||
|
* @param length - Number of chars in the string
|
||||||
|
*
|
||||||
|
* @return random String
|
||||||
|
*/
|
||||||
|
private String getRandomName(int length) {
|
||||||
|
StringBuilder name = new StringBuilder();
|
||||||
|
for (int x = 0; x < length; x++) {
|
||||||
|
name.append(stringTable.charAt(rand.nextInt(stringTable.length())));
|
||||||
|
}
|
||||||
|
return name.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a Random Storage Type.
|
||||||
|
*
|
||||||
|
* @return - StorageType
|
||||||
|
*/
|
||||||
|
private StorageType getRandomStorageType() {
|
||||||
|
return StorageType.parseStorageType(rand.nextInt(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns random capacity, if the size is smaller than 10
|
||||||
|
* they are TBs otherwise the size is assigned to GB range.
|
||||||
|
*
|
||||||
|
* @return Long - Disk Size
|
||||||
|
*/
|
||||||
|
private long getRandomCapacity() {
|
||||||
|
int size = diskSizes[rand.nextInt(diskSizes.length)];
|
||||||
|
if (size < 10) {
|
||||||
|
return size * TB;
|
||||||
|
} else {
|
||||||
|
return size * GB;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some value under 20% in these tests.
|
||||||
|
*/
|
||||||
|
private long getRandomReserved(long capacity) {
|
||||||
|
double rcap = capacity * 0.2d;
|
||||||
|
double randDouble = rand.nextDouble();
|
||||||
|
double temp = randDouble * rcap;
|
||||||
|
return (new Double(temp)).longValue();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some value less that capacity - reserved.
|
||||||
|
*/
|
||||||
|
private long getRandomDfsUsed(long capacity, long reserved) {
|
||||||
|
double rcap = capacity - reserved;
|
||||||
|
double randDouble = rand.nextDouble();
|
||||||
|
double temp = randDouble * rcap;
|
||||||
|
return (new Double(temp)).longValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a Random Volume of a specific storageType.
|
||||||
|
*
|
||||||
|
* @return Volume
|
||||||
|
*/
|
||||||
|
public DiskBalancerVolume createRandomVolume() {
|
||||||
|
return createRandomVolume(getRandomStorageType());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a Random Volume for testing purpose.
|
||||||
|
*
|
||||||
|
* @param type - StorageType
|
||||||
|
*
|
||||||
|
* @return DiskBalancerVolume
|
||||||
|
*/
|
||||||
|
public DiskBalancerVolume createRandomVolume(StorageType type) {
|
||||||
|
DiskBalancerVolume volume = new DiskBalancerVolume();
|
||||||
|
volume.setPath("/tmp/disk/" + getRandomName(10));
|
||||||
|
volume.setStorageType(type.toString());
|
||||||
|
volume.setTransient(type.isTransient());
|
||||||
|
|
||||||
|
volume.setCapacity(getRandomCapacity());
|
||||||
|
volume.setReserved(getRandomReserved(volume.getCapacity()));
|
||||||
|
volume
|
||||||
|
.setUsed(getRandomDfsUsed(volume.getCapacity(), volume.getReserved()));
|
||||||
|
volume.setUuid(UUID.randomUUID().toString());
|
||||||
|
return volume;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a RandomVolumeSet.
|
||||||
|
*
|
||||||
|
* @param type -Storage Type
|
||||||
|
* @param diskCount - How many disks you need.
|
||||||
|
*
|
||||||
|
* @return volumeSet
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type,
|
||||||
|
int diskCount)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Preconditions.checkState(diskCount > 0);
|
||||||
|
DiskBalancerVolumeSet volumeSet =
|
||||||
|
new DiskBalancerVolumeSet(type.isTransient());
|
||||||
|
for (int x = 0; x < diskCount; x++) {
|
||||||
|
volumeSet.addVolume(createRandomVolume(type));
|
||||||
|
}
|
||||||
|
assert (volumeSet.getVolumeCount() == diskCount);
|
||||||
|
return volumeSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a RandomDataNode.
|
||||||
|
*
|
||||||
|
* @param diskTypes - Storage types needed in the Node
|
||||||
|
* @param diskCount - Disk count - that many disks of each type is created
|
||||||
|
*
|
||||||
|
* @return DataNode
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes,
|
||||||
|
int diskCount)
|
||||||
|
throws Exception {
|
||||||
|
Preconditions.checkState(diskTypes.length > 0);
|
||||||
|
Preconditions.checkState(diskCount > 0);
|
||||||
|
|
||||||
|
DiskBalancerDataNode node =
|
||||||
|
new DiskBalancerDataNode(UUID.randomUUID().toString());
|
||||||
|
|
||||||
|
for (StorageType t : diskTypes) {
|
||||||
|
DiskBalancerVolumeSet vSet = createRandomVolumeSet(t, diskCount);
|
||||||
|
for (DiskBalancerVolume v : vSet.getVolumes()) {
|
||||||
|
node.addVolume(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a RandomCluster.
|
||||||
|
*
|
||||||
|
* @param dataNodeCount - How many nodes you need
|
||||||
|
* @param diskTypes - StorageTypes you need in each node
|
||||||
|
* @param diskCount - How many disks you need of each type.
|
||||||
|
*
|
||||||
|
* @return Cluster
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public DiskBalancerCluster createRandCluster(int dataNodeCount,
|
||||||
|
StorageType[] diskTypes,
|
||||||
|
int diskCount)
|
||||||
|
|
||||||
|
throws Exception {
|
||||||
|
Preconditions.checkState(diskTypes.length > 0);
|
||||||
|
Preconditions.checkState(diskCount > 0);
|
||||||
|
Preconditions.checkState(dataNodeCount > 0);
|
||||||
|
NullConnector nullConnector = new NullConnector();
|
||||||
|
DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
|
||||||
|
|
||||||
|
// once we add these nodes into the connector, cluster will read them
|
||||||
|
// from the connector.
|
||||||
|
for (int x = 0; x < dataNodeCount; x++) {
|
||||||
|
nullConnector.addNode(createRandomDataNode(diskTypes, diskCount));
|
||||||
|
}
|
||||||
|
|
||||||
|
// with this call we have populated the cluster info
|
||||||
|
cluster.readClusterInfo();
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,224 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class TestDataModels {
|
||||||
|
@Test
|
||||||
|
public void TestCreateRandomVolume() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerVolume vol = util.createRandomVolume(StorageType.DISK);
|
||||||
|
Assert.assertNotNull(vol.getUuid());
|
||||||
|
Assert.assertNotNull(vol.getPath());
|
||||||
|
Assert.assertNotNull(vol.getStorageType());
|
||||||
|
Assert.assertFalse(vol.isFailed());
|
||||||
|
Assert.assertFalse(vol.isTransient());
|
||||||
|
Assert.assertTrue(vol.getCapacity() > 0);
|
||||||
|
Assert.assertTrue((vol.getCapacity() - vol.getReserved()) > 0);
|
||||||
|
Assert.assertTrue((vol.getReserved() + vol.getUsed()) < vol.getCapacity());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestCreateRandomVolumeSet() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerVolumeSet vSet =
|
||||||
|
util.createRandomVolumeSet(StorageType.SSD, 10);
|
||||||
|
Assert.assertEquals(10, vSet.getVolumeCount());
|
||||||
|
Assert.assertEquals(StorageType.SSD.toString(),
|
||||||
|
vSet.getVolumes().get(0).getStorageType());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestCreateRandomDataNode() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerDataNode node = util.createRandomDataNode(
|
||||||
|
new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 10);
|
||||||
|
Assert.assertNotNull(node.getNodeDataDensity());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestDiskQueues() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerDataNode node = util.createRandomDataNode(
|
||||||
|
new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 3);
|
||||||
|
TreeSet<DiskBalancerVolume> sortedQueue =
|
||||||
|
node.getVolumeSets().get(StorageType.DISK.toString()).getSortedQueue();
|
||||||
|
|
||||||
|
List<DiskBalancerVolume> reverseList = new LinkedList<>();
|
||||||
|
List<DiskBalancerVolume> highList = new LinkedList<>();
|
||||||
|
int queueSize = sortedQueue.size();
|
||||||
|
for (int x = 0; x < queueSize; x++) {
|
||||||
|
reverseList.add(sortedQueue.first());
|
||||||
|
highList.add(sortedQueue.first());
|
||||||
|
}
|
||||||
|
Collections.reverse(reverseList);
|
||||||
|
|
||||||
|
for (int x = 0; x < queueSize; x++) {
|
||||||
|
|
||||||
|
Assert.assertEquals(reverseList.get(x).getCapacity(),
|
||||||
|
highList.get(x).getCapacity());
|
||||||
|
Assert.assertEquals(reverseList.get(x).getReserved(),
|
||||||
|
highList.get(x).getReserved());
|
||||||
|
Assert.assertEquals(reverseList.get(x).getUsed(),
|
||||||
|
highList.get(x).getUsed());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestNoBalancingNeededEvenDataSpread() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerDataNode node =
|
||||||
|
new DiskBalancerDataNode(UUID.randomUUID().toString());
|
||||||
|
|
||||||
|
// create two disks which have exactly same data and isBalancing should
|
||||||
|
// say we don't need to balance.
|
||||||
|
DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
|
||||||
|
v1.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v1.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v1.setUsed(500 * DiskBalancerTestUtil.GB);
|
||||||
|
|
||||||
|
DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
|
||||||
|
v2.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v2.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v2.setUsed(500 * DiskBalancerTestUtil.GB);
|
||||||
|
|
||||||
|
node.addVolume(v1);
|
||||||
|
node.addVolume(v2);
|
||||||
|
|
||||||
|
for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
|
||||||
|
Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestNoBalancingNeededTransientDisks() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerDataNode node =
|
||||||
|
new DiskBalancerDataNode(UUID.randomUUID().toString());
|
||||||
|
|
||||||
|
// create two disks which have different data sizes, but
|
||||||
|
// transient. isBalancing should say no balancing needed.
|
||||||
|
DiskBalancerVolume v1 = util.createRandomVolume(StorageType.RAM_DISK);
|
||||||
|
v1.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v1.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v1.setUsed(1 * DiskBalancerTestUtil.GB);
|
||||||
|
|
||||||
|
DiskBalancerVolume v2 = util.createRandomVolume(StorageType.RAM_DISK);
|
||||||
|
v2.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v2.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v2.setUsed(500 * DiskBalancerTestUtil.GB);
|
||||||
|
|
||||||
|
node.addVolume(v1);
|
||||||
|
node.addVolume(v2);
|
||||||
|
|
||||||
|
for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
|
||||||
|
Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestNoBalancingNeededFailedDisks() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerDataNode node =
|
||||||
|
new DiskBalancerDataNode(UUID.randomUUID().toString());
|
||||||
|
|
||||||
|
// create two disks which have which are normal disks, but fail
|
||||||
|
// one of them. VolumeSet should say no balancing needed.
|
||||||
|
DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
|
||||||
|
v1.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v1.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v1.setUsed(1 * DiskBalancerTestUtil.GB);
|
||||||
|
v1.setFailed(true);
|
||||||
|
|
||||||
|
DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
|
||||||
|
v2.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v2.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v2.setUsed(500 * DiskBalancerTestUtil.GB);
|
||||||
|
|
||||||
|
node.addVolume(v1);
|
||||||
|
node.addVolume(v2);
|
||||||
|
|
||||||
|
for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
|
||||||
|
Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestNeedBalancingUnevenDataSpread() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerDataNode node =
|
||||||
|
new DiskBalancerDataNode(UUID.randomUUID().toString());
|
||||||
|
|
||||||
|
DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
|
||||||
|
v1.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v1.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v1.setUsed(0);
|
||||||
|
|
||||||
|
DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
|
||||||
|
v2.setCapacity(DiskBalancerTestUtil.TB);
|
||||||
|
v2.setReserved(100 * DiskBalancerTestUtil.GB);
|
||||||
|
v2.setUsed(500 * DiskBalancerTestUtil.GB);
|
||||||
|
|
||||||
|
node.addVolume(v1);
|
||||||
|
node.addVolume(v2);
|
||||||
|
|
||||||
|
for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
|
||||||
|
Assert.assertTrue(vsets.isBalancingNeeded(10.0f));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestVolumeSerialize() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
DiskBalancerVolume volume = util.createRandomVolume(StorageType.DISK);
|
||||||
|
String originalString = volume.toJson();
|
||||||
|
DiskBalancerVolume parsedVolume =
|
||||||
|
DiskBalancerVolume.parseJson(originalString);
|
||||||
|
String parsedString = parsedVolume.toJson();
|
||||||
|
Assert.assertEquals(originalString, parsedString);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestClusterSerialize() throws Exception {
|
||||||
|
DiskBalancerTestUtil util = new DiskBalancerTestUtil();
|
||||||
|
|
||||||
|
// Create a Cluster with 3 datanodes, 3 disk types and 3 disks in each type
|
||||||
|
// that is 9 disks in each machine.
|
||||||
|
DiskBalancerCluster cluster = util.createRandCluster(3, new StorageType[]{
|
||||||
|
StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD}, 3);
|
||||||
|
|
||||||
|
DiskBalancerCluster newCluster =
|
||||||
|
DiskBalancerCluster.parseJson(cluster.toJson());
|
||||||
|
Assert.assertEquals(cluster.getNodes(), newCluster.getNodes());
|
||||||
|
Assert
|
||||||
|
.assertEquals(cluster.getNodes().size(), newCluster.getNodes().size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This connector allows user to create an in-memory cluster
|
||||||
|
* and is useful in testing.
|
||||||
|
*/
|
||||||
|
public class NullConnector implements ClusterConnector {
|
||||||
|
private final List<DiskBalancerDataNode> nodes = new LinkedList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getNodes function returns a list of DiskBalancerDataNodes.
|
||||||
|
*
|
||||||
|
* @return Array of DiskBalancerDataNodes
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<DiskBalancerDataNode> getNodes() throws Exception {
|
||||||
|
return nodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns info about the connector.
|
||||||
|
*
|
||||||
|
* @return String.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String getConnectorInfo() {
|
||||||
|
return "Null Connector : No persistence, in-memory connector";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows user to add nodes into this connector.
|
||||||
|
*
|
||||||
|
* @param node - Node to add
|
||||||
|
*/
|
||||||
|
public void addNode(DiskBalancerDataNode node) {
|
||||||
|
nodes.add(node);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue