HDFS-9545: DiskBalancer: Add Plan Command. Contributed by Anu Engineer.
This commit is contained in:
parent
1594b472bb
commit
75882ec0b0
|
@ -39,6 +39,7 @@ function hadoop_usage
|
||||||
hadoop_add_subcommand "debug" "run a Debug Admin to execute HDFS debug commands"
|
hadoop_add_subcommand "debug" "run a Debug Admin to execute HDFS debug commands"
|
||||||
hadoop_add_subcommand "dfs" "run a filesystem command on the file system"
|
hadoop_add_subcommand "dfs" "run a filesystem command on the file system"
|
||||||
hadoop_add_subcommand "dfsadmin" "run a DFS admin client"
|
hadoop_add_subcommand "dfsadmin" "run a DFS admin client"
|
||||||
|
hadoop_add_subcommand "diskbalancer" "Distributes data evenly among disks on a given node"
|
||||||
hadoop_add_subcommand "envvars" "display computed Hadoop environment variables"
|
hadoop_add_subcommand "envvars" "display computed Hadoop environment variables"
|
||||||
hadoop_add_subcommand "erasurecode" "run a HDFS ErasureCoding CLI"
|
hadoop_add_subcommand "erasurecode" "run a HDFS ErasureCoding CLI"
|
||||||
hadoop_add_subcommand "fetchdt" "fetch a delegation token from the NameNode"
|
hadoop_add_subcommand "fetchdt" "fetch a delegation token from the NameNode"
|
||||||
|
@ -125,6 +126,11 @@ function hdfscmd_case
|
||||||
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
|
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
|
||||||
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
|
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
|
||||||
;;
|
;;
|
||||||
|
diskbalancer)
|
||||||
|
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.DiskBalancer
|
||||||
|
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
|
||||||
|
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
|
||||||
|
;;
|
||||||
envvars)
|
envvars)
|
||||||
echo "JAVA_HOME='${JAVA_HOME}'"
|
echo "JAVA_HOME='${JAVA_HOME}'"
|
||||||
echo "HADOOP_HDFS_HOME='${HADOOP_HDFS_HOME}'"
|
echo "HADOOP_HDFS_HOME='${HADOOP_HDFS_HOME}'"
|
||||||
|
|
|
@ -0,0 +1,381 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.cli.Option;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.tools.DiskBalancer;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Common interface for command handling.
|
||||||
|
*/
|
||||||
|
public abstract class Command extends Configured {
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(Command.class);
|
||||||
|
private Map<String, String> validArgs = new HashMap<>();
|
||||||
|
private URI clusterURI;
|
||||||
|
private FileSystem fs = null;
|
||||||
|
private DiskBalancerCluster cluster = null;
|
||||||
|
|
||||||
|
private static final Path DEFAULT_LOG_DIR = new Path("/system/diskbalancer");
|
||||||
|
|
||||||
|
private Path diskBalancerLogs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a command.
|
||||||
|
*/
|
||||||
|
public Command(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
// These arguments are valid for all commands.
|
||||||
|
addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " +
|
||||||
|
"file URI for cluster");
|
||||||
|
addValidCommandParameters(DiskBalancer.HELP, "Help for this command");
|
||||||
|
addValidCommandParameters("arg", "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes the Client Calls.
|
||||||
|
*
|
||||||
|
* @param cmd - CommandLine
|
||||||
|
* @throws IOException
|
||||||
|
* @throws URISyntaxException
|
||||||
|
*/
|
||||||
|
public abstract void execute(CommandLine cmd) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets extended help for this command.
|
||||||
|
*
|
||||||
|
* @return Help Message
|
||||||
|
*/
|
||||||
|
protected abstract String getHelp();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verifies user provided URL.
|
||||||
|
*
|
||||||
|
* @param uri - UrlString
|
||||||
|
* @return URL
|
||||||
|
* @throws URISyntaxException, MalformedURLException
|
||||||
|
*/
|
||||||
|
protected URI verifyURI(String uri)
|
||||||
|
throws URISyntaxException, MalformedURLException {
|
||||||
|
if ((uri == null) || uri.isEmpty()) {
|
||||||
|
throw new MalformedURLException(
|
||||||
|
"A valid URI is needed to execute this command.");
|
||||||
|
}
|
||||||
|
return new URI(uri);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the URI and return the cluster with nodes setup. This is used in
|
||||||
|
* all commands.
|
||||||
|
*
|
||||||
|
* @param cmd - CommandLine
|
||||||
|
* @return DiskBalancerCluster
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected DiskBalancerCluster readClusterInfo(CommandLine cmd) throws
|
||||||
|
Exception {
|
||||||
|
Preconditions.checkNotNull(cmd);
|
||||||
|
Preconditions
|
||||||
|
.checkState(cmd.getOptionValue(DiskBalancer.NAMENODEURI) != null,
|
||||||
|
"Required argument missing : uri");
|
||||||
|
|
||||||
|
setClusterURI(verifyURI(cmd.getOptionValue(DiskBalancer.NAMENODEURI)));
|
||||||
|
LOG.debug("using name node URI : {}", this.getClusterURI());
|
||||||
|
ClusterConnector connector = ConnectorFactory.getCluster(this.clusterURI,
|
||||||
|
getConf());
|
||||||
|
|
||||||
|
cluster = new DiskBalancerCluster(connector);
|
||||||
|
|
||||||
|
LOG.debug("Reading cluster info");
|
||||||
|
cluster.readClusterInfo();
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup the outpath.
|
||||||
|
*
|
||||||
|
* @param path - Path or null to use default path.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected void setOutputPath(String path) throws IOException {
|
||||||
|
|
||||||
|
SimpleDateFormat format = new SimpleDateFormat("yyyy-MMM-dd-HH-mm-ss");
|
||||||
|
Date now = new Date();
|
||||||
|
|
||||||
|
fs = FileSystem.get(getClusterURI(), getConf());
|
||||||
|
if (path == null || path.isEmpty()) {
|
||||||
|
if (getClusterURI().getScheme().startsWith("file")) {
|
||||||
|
diskBalancerLogs = new Path(
|
||||||
|
System.getProperty("user.dir") + DEFAULT_LOG_DIR.toString() +
|
||||||
|
format.format(now));
|
||||||
|
} else {
|
||||||
|
diskBalancerLogs = new Path(DEFAULT_LOG_DIR.toString() +
|
||||||
|
format.format(now));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
diskBalancerLogs = new Path(path);
|
||||||
|
}
|
||||||
|
if (fs.exists(diskBalancerLogs)) {
|
||||||
|
LOG.error("Another Diskbalancer instance is running ? - Target " +
|
||||||
|
"Directory already exists. {}", diskBalancerLogs);
|
||||||
|
throw new IOException("Another DiskBalancer files already exist at the " +
|
||||||
|
"target location. " + diskBalancerLogs.toString());
|
||||||
|
}
|
||||||
|
fs.mkdirs(diskBalancerLogs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the nodes to process.
|
||||||
|
*
|
||||||
|
* @param node - Node
|
||||||
|
*/
|
||||||
|
protected void setNodesToProcess(DiskBalancerDataNode node) {
|
||||||
|
List<DiskBalancerDataNode> nodelist = new LinkedList<>();
|
||||||
|
nodelist.add(node);
|
||||||
|
setNodesToProcess(nodelist);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the list of Nodes to process.
|
||||||
|
*
|
||||||
|
* @param nodes Nodes.
|
||||||
|
*/
|
||||||
|
protected void setNodesToProcess(List<DiskBalancerDataNode> nodes) {
|
||||||
|
if (cluster == null) {
|
||||||
|
throw new IllegalStateException("Set nodes to process invoked before " +
|
||||||
|
"initializing cluster. Illegal usage.");
|
||||||
|
}
|
||||||
|
cluster.setNodesToProcess(nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a DiskBalancer Node from the Cluster or null if not found.
|
||||||
|
*
|
||||||
|
* @param nodeName - can the hostname, IP address or UUID of the node.
|
||||||
|
* @return - DataNode if found.
|
||||||
|
*/
|
||||||
|
DiskBalancerDataNode getNode(String nodeName) {
|
||||||
|
DiskBalancerDataNode node = null;
|
||||||
|
if (nodeName == null || nodeName.isEmpty()) {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
if (cluster.getNodes().size() == 0) {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
node = cluster.getNodeByName(nodeName);
|
||||||
|
if (node != null) {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
node = cluster.getNodeByIPAddress(nodeName);
|
||||||
|
if (node != null) {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
node = cluster.getNodeByUUID(nodeName);
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the node set from a file or a string.
|
||||||
|
*
|
||||||
|
* @param listArg - String File URL or a comma separated list of node names.
|
||||||
|
* @return Set of node names
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private Set<String> getNodeList(String listArg) throws IOException {
|
||||||
|
URL listURL;
|
||||||
|
String nodeData;
|
||||||
|
Set<String> resultSet = new TreeSet<>();
|
||||||
|
|
||||||
|
if ((listArg == null) || listArg.isEmpty()) {
|
||||||
|
return resultSet;
|
||||||
|
}
|
||||||
|
if (listArg.startsWith("file://")) {
|
||||||
|
listURL = new URL(listArg);
|
||||||
|
byte[] data = Files.readAllBytes(Paths.get(listURL.getPath()));
|
||||||
|
nodeData = new String(data, Charset.forName("UTF-8"));
|
||||||
|
} else {
|
||||||
|
nodeData = listArg;
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] nodes = nodeData.split(",");
|
||||||
|
Collections.addAll(resultSet, nodes);
|
||||||
|
return resultSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies if the command line options are sane.
|
||||||
|
*
|
||||||
|
* @param commandName - Name of the command
|
||||||
|
* @param cmd - Parsed Command Line
|
||||||
|
*/
|
||||||
|
protected void verifyCommandOptions(String commandName, CommandLine cmd) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Iterator<Option> iter = cmd.iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Option opt = iter.next();
|
||||||
|
if (!validArgs.containsKey(opt.getArgName())) {
|
||||||
|
String errMessage = String
|
||||||
|
.format("%nInvalid argument found for command %s : %s%n",
|
||||||
|
commandName, opt.getArgName());
|
||||||
|
StringBuilder validArguments = new StringBuilder();
|
||||||
|
validArguments.append("Valid arguments are : %n");
|
||||||
|
for (Map.Entry<String, String> args : validArgs.entrySet()) {
|
||||||
|
String key = args.getKey();
|
||||||
|
String desc = args.getValue();
|
||||||
|
String s = String.format("\t %s : %s %n", key, desc);
|
||||||
|
validArguments.append(s);
|
||||||
|
}
|
||||||
|
LOG.error(errMessage + validArguments.toString());
|
||||||
|
throw new IllegalArgumentException("Invalid Arguments found.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets cluster URL.
|
||||||
|
*
|
||||||
|
* @return - URL
|
||||||
|
*/
|
||||||
|
public URI getClusterURI() {
|
||||||
|
return clusterURI;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set cluster URL.
|
||||||
|
*
|
||||||
|
* @param clusterURI - URL
|
||||||
|
*/
|
||||||
|
public void setClusterURI(URI clusterURI) {
|
||||||
|
this.clusterURI = clusterURI;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copied from DFSAdmin.java. -- Creates a connection to dataNode.
|
||||||
|
*
|
||||||
|
* @param datanode - dataNode.
|
||||||
|
* @return ClientDataNodeProtocol
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public ClientDatanodeProtocol getDataNodeProxy(String datanode)
|
||||||
|
throws IOException {
|
||||||
|
InetSocketAddress datanodeAddr = NetUtils.createSocketAddr(datanode);
|
||||||
|
|
||||||
|
// For datanode proxy the server principal should be DN's one.
|
||||||
|
getConf().set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
|
||||||
|
getConf().get(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, ""));
|
||||||
|
|
||||||
|
// Create the client
|
||||||
|
ClientDatanodeProtocol dnProtocol =
|
||||||
|
DFSUtilClient.createClientDatanodeProtocolProxy(datanodeAddr, getUGI(),
|
||||||
|
getConf(), NetUtils.getSocketFactory(getConf(),
|
||||||
|
ClientDatanodeProtocol
|
||||||
|
.class));
|
||||||
|
return dnProtocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns UGI.
|
||||||
|
*
|
||||||
|
* @return UserGroupInformation.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static UserGroupInformation getUGI()
|
||||||
|
throws IOException {
|
||||||
|
return UserGroupInformation.getCurrentUser();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a file created in the cluster.
|
||||||
|
*
|
||||||
|
* @param fileName - fileName to open.
|
||||||
|
* @return OutputStream.
|
||||||
|
*/
|
||||||
|
protected FSDataOutputStream create(String fileName) throws IOException {
|
||||||
|
return fs.create(new Path(this.diskBalancerLogs, fileName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the output path where the plan and snapshot gets written.
|
||||||
|
*
|
||||||
|
* @return Path
|
||||||
|
*/
|
||||||
|
protected Path getOutputPath() {
|
||||||
|
return diskBalancerLogs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds valid params to the valid args table.
|
||||||
|
*
|
||||||
|
* @param key
|
||||||
|
* @param desc
|
||||||
|
*/
|
||||||
|
protected void addValidCommandParameters(String key, String desc) {
|
||||||
|
validArgs.put(key, desc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the cluster.
|
||||||
|
*
|
||||||
|
* @return Cluster.
|
||||||
|
*/
|
||||||
|
protected DiskBalancerCluster getCluster() {
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,217 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.tools.DiskBalancer;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that implements Plan Command.
|
||||||
|
* <p>
|
||||||
|
* Plan command reads the Cluster Info and creates a plan for specified data
|
||||||
|
* node or a set of Data nodes.
|
||||||
|
* <p>
|
||||||
|
* It writes the output to a default location unless changed by the user.
|
||||||
|
*/
|
||||||
|
public class PlanCommand extends Command {
|
||||||
|
private double thresholdPercentage;
|
||||||
|
private int bandwidth;
|
||||||
|
private int maxError;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a plan command.
|
||||||
|
*/
|
||||||
|
public PlanCommand(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
this.thresholdPercentage = 1;
|
||||||
|
this.bandwidth = 0;
|
||||||
|
this.maxError = 0;
|
||||||
|
addValidCommandParameters(DiskBalancer.OUTFILE, "Output file");
|
||||||
|
addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " +
|
||||||
|
"be used while copying.");
|
||||||
|
addValidCommandParameters(DiskBalancer.THRESHOLD, "Percentage skew that " +
|
||||||
|
"we tolerate before diskbalancer starts working.");
|
||||||
|
addValidCommandParameters(DiskBalancer.MAXERROR, "Max errors to tolerate " +
|
||||||
|
"between 2 disks");
|
||||||
|
addValidCommandParameters(DiskBalancer.NODE, "Name / Address of the node.");
|
||||||
|
addValidCommandParameters(DiskBalancer.VERBOSE, "Run plan command in " +
|
||||||
|
"verbose mode.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the plan command. This command can be run with various options like
|
||||||
|
* <p>
|
||||||
|
* -plan -node IP -plan -node hostName -plan -node DatanodeUUID
|
||||||
|
*
|
||||||
|
* @param cmd - CommandLine
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void execute(CommandLine cmd) throws Exception {
|
||||||
|
LOG.debug("Processing Plan Command.");
|
||||||
|
Preconditions.checkState(cmd.hasOption(DiskBalancer.PLAN));
|
||||||
|
verifyCommandOptions(DiskBalancer.PLAN, cmd);
|
||||||
|
|
||||||
|
if (!cmd.hasOption(DiskBalancer.NODE)) {
|
||||||
|
throw new IllegalArgumentException("A node name is required to create a" +
|
||||||
|
" plan.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cmd.hasOption(DiskBalancer.BANDWIDTH)) {
|
||||||
|
this.bandwidth = Integer.parseInt(cmd.getOptionValue(DiskBalancer
|
||||||
|
.BANDWIDTH));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cmd.hasOption(DiskBalancer.MAXERROR)) {
|
||||||
|
this.maxError = Integer.parseInt(cmd.getOptionValue(DiskBalancer
|
||||||
|
.MAXERROR));
|
||||||
|
}
|
||||||
|
|
||||||
|
readClusterInfo(cmd);
|
||||||
|
String output = null;
|
||||||
|
if (cmd.hasOption(DiskBalancer.OUTFILE)) {
|
||||||
|
output = cmd.getOptionValue(DiskBalancer.OUTFILE);
|
||||||
|
}
|
||||||
|
setOutputPath(output);
|
||||||
|
|
||||||
|
DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.NODE));
|
||||||
|
if (node == null) {
|
||||||
|
throw new IllegalArgumentException("Unable to find the specified node. " +
|
||||||
|
cmd.getOptionValue(DiskBalancer.NODE));
|
||||||
|
}
|
||||||
|
this.thresholdPercentage = getThresholdPercentage(cmd);
|
||||||
|
setNodesToProcess(node);
|
||||||
|
|
||||||
|
List<NodePlan> plans = getCluster().computePlan(this.thresholdPercentage);
|
||||||
|
setPlanParams(plans);
|
||||||
|
|
||||||
|
LOG.info("Writing plan to : {}", getOutputPath());
|
||||||
|
System.out.printf("Writing plan to : %s%n", getOutputPath());
|
||||||
|
|
||||||
|
try(FSDataOutputStream beforeStream = create(String.format(
|
||||||
|
DiskBalancer.BEFORE_TEMPLATE,
|
||||||
|
cmd.getOptionValue(DiskBalancer.NODE)))) {
|
||||||
|
beforeStream.write(getCluster().toJson()
|
||||||
|
.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
try(FSDataOutputStream planStream = create(String.format(
|
||||||
|
DiskBalancer.PLAN_TEMPLATE,
|
||||||
|
cmd.getOptionValue(DiskBalancer.NODE)))) {
|
||||||
|
planStream.write(getPlan(plans).getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cmd.hasOption(DiskBalancer.VERBOSE)) {
|
||||||
|
printToScreen(plans);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets extended help for this command.
|
||||||
|
*
|
||||||
|
* @return Help Message
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected String getHelp() {
|
||||||
|
return "This commands creates a disk balancer plan for given datanode";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Threshold for planning purpose.
|
||||||
|
*
|
||||||
|
* @param cmd - Command Line Argument.
|
||||||
|
* @return double
|
||||||
|
*/
|
||||||
|
private double getThresholdPercentage(CommandLine cmd) {
|
||||||
|
Double value = 0.0;
|
||||||
|
if (cmd.hasOption(DiskBalancer.THRESHOLD)) {
|
||||||
|
value = Double.parseDouble(cmd.getOptionValue(DiskBalancer.THRESHOLD));
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((value <= 0.0) || (value > 100.0)) {
|
||||||
|
value = getConf().getDouble(
|
||||||
|
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
|
||||||
|
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prints a quick summary of the plan to screen.
|
||||||
|
*
|
||||||
|
* @param plans - List of NodePlans.
|
||||||
|
*/
|
||||||
|
static private void printToScreen(List<NodePlan> plans) {
|
||||||
|
System.out.println("\nPlan :\n");
|
||||||
|
System.out.println(StringUtils.repeat("=", 80));
|
||||||
|
System.out.println("Source Disk\t\t Dest.Disk\t\t Move Size\t Type\n ");
|
||||||
|
for (NodePlan plan : plans) {
|
||||||
|
for (Step step : plan.getVolumeSetPlans()) {
|
||||||
|
System.out.println(String.format("%s\t%s\t%s\t%s",
|
||||||
|
step.getSourceVolume().getPath(),
|
||||||
|
step.getDestinationVolume().getPath(),
|
||||||
|
step.getSizeString(step.getBytesToMove()),
|
||||||
|
step.getDestinationVolume().getStorageType()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println(StringUtils.repeat("=", 80));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets user specified plan parameters.
|
||||||
|
*
|
||||||
|
* @param plans - list of plans.
|
||||||
|
*/
|
||||||
|
private void setPlanParams(List<NodePlan> plans) {
|
||||||
|
for (NodePlan plan : plans) {
|
||||||
|
for (Step step : plan.getVolumeSetPlans()) {
|
||||||
|
if (this.bandwidth > 0) {
|
||||||
|
step.setBandwidth(this.bandwidth);
|
||||||
|
}
|
||||||
|
if (this.maxError > 0) {
|
||||||
|
step.setMaxDiskErrors(this.maxError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a Json represenation of the plans.
|
||||||
|
*
|
||||||
|
* @param plan - List of plans.
|
||||||
|
* @return String.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private String getPlan(List<NodePlan> plan) throws IOException {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.writeValueAsString(plan);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
Commands for disk balancer command line tool.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
|
@ -31,12 +31,13 @@ import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -76,6 +77,13 @@ public class DiskBalancerCluster {
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private List<DiskBalancerDataNode> nodesToProcess;
|
private List<DiskBalancerDataNode> nodesToProcess;
|
||||||
|
@JsonIgnore
|
||||||
|
private final Map<String, DiskBalancerDataNode> ipList;
|
||||||
|
@JsonIgnore
|
||||||
|
private final Map<String, DiskBalancerDataNode> hostNames;
|
||||||
|
@JsonIgnore
|
||||||
|
private final Map<String, DiskBalancerDataNode> hostUUID;
|
||||||
|
|
||||||
private float threshold;
|
private float threshold;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -85,7 +93,9 @@ public class DiskBalancerCluster {
|
||||||
nodes = new LinkedList<>();
|
nodes = new LinkedList<>();
|
||||||
exclusionList = new TreeSet<>();
|
exclusionList = new TreeSet<>();
|
||||||
inclusionList = new TreeSet<>();
|
inclusionList = new TreeSet<>();
|
||||||
|
ipList = new HashMap<>();
|
||||||
|
hostNames = new HashMap<>();
|
||||||
|
hostUUID = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -95,10 +105,9 @@ public class DiskBalancerCluster {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public DiskBalancerCluster(ClusterConnector connector) throws IOException {
|
public DiskBalancerCluster(ClusterConnector connector) throws IOException {
|
||||||
|
this();
|
||||||
Preconditions.checkNotNull(connector);
|
Preconditions.checkNotNull(connector);
|
||||||
clusterConnector = connector;
|
clusterConnector = connector;
|
||||||
exclusionList = new TreeSet<>();
|
|
||||||
inclusionList = new TreeSet<>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -119,8 +128,25 @@ public class DiskBalancerCluster {
|
||||||
*/
|
*/
|
||||||
public void readClusterInfo() throws Exception {
|
public void readClusterInfo() throws Exception {
|
||||||
Preconditions.checkNotNull(clusterConnector);
|
Preconditions.checkNotNull(clusterConnector);
|
||||||
LOG.info("Using connector : " + clusterConnector.getConnectorInfo());
|
LOG.debug("Using connector : {}" , clusterConnector.getConnectorInfo());
|
||||||
nodes = clusterConnector.getNodes();
|
nodes = clusterConnector.getNodes();
|
||||||
|
for(DiskBalancerDataNode node : nodes) {
|
||||||
|
|
||||||
|
if(node.getDataNodeIP()!= null && !node.getDataNodeIP().isEmpty()) {
|
||||||
|
ipList.put(node.getDataNodeIP(), node);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(node.getDataNodeName() != null && !node.getDataNodeName().isEmpty()) {
|
||||||
|
// TODO : should we support Internationalized Domain Names ?
|
||||||
|
// Disk balancer assumes that host names are ascii. If not
|
||||||
|
// end user can always balance the node via IP address or DataNode UUID.
|
||||||
|
hostNames.put(node.getDataNodeName().toLowerCase(Locale.US), node);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(node.getDataNodeUUID() != null && !node.getDataNodeUUID().isEmpty()) {
|
||||||
|
hostUUID.put(node.getDataNodeUUID(), node);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -258,30 +284,6 @@ public class DiskBalancerCluster {
|
||||||
FileUtils.writeStringToFile(outFile, json);
|
FileUtils.writeStringToFile(outFile, json);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates an Output directory for the cluster output.
|
|
||||||
*
|
|
||||||
* @throws IOException - On failure to create an new directory
|
|
||||||
*/
|
|
||||||
public void createOutPutDirectory() throws IOException {
|
|
||||||
if (Files.exists(Paths.get(this.getOutput()))) {
|
|
||||||
LOG.error("An output directory already exists at this location. Path : " +
|
|
||||||
this.getOutput());
|
|
||||||
throw new IOException(
|
|
||||||
"An output directory already exists at this location. Path : " +
|
|
||||||
this.getOutput());
|
|
||||||
}
|
|
||||||
|
|
||||||
File f = new File(this.getOutput());
|
|
||||||
if (!f.mkdirs()) {
|
|
||||||
LOG.error("Unable to create the output directory. Path : " + this
|
|
||||||
.getOutput());
|
|
||||||
throw new IOException(
|
|
||||||
"Unable to create the output directory. Path : " + this.getOutput());
|
|
||||||
}
|
|
||||||
LOG.info("Output directory created. Path : " + this.getOutput());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute plan takes a node and constructs a planner that creates a plan that
|
* Compute plan takes a node and constructs a planner that creates a plan that
|
||||||
* we would like to follow.
|
* we would like to follow.
|
||||||
|
@ -294,7 +296,7 @@ public class DiskBalancerCluster {
|
||||||
* @param thresholdPercent - in percentage
|
* @param thresholdPercent - in percentage
|
||||||
* @return list of NodePlans
|
* @return list of NodePlans
|
||||||
*/
|
*/
|
||||||
public List<NodePlan> computePlan(float thresholdPercent) {
|
public List<NodePlan> computePlan(double thresholdPercent) {
|
||||||
List<NodePlan> planList = new LinkedList<>();
|
List<NodePlan> planList = new LinkedList<>();
|
||||||
|
|
||||||
if (nodesToProcess == null) {
|
if (nodesToProcess == null) {
|
||||||
|
@ -366,11 +368,24 @@ public class DiskBalancerCluster {
|
||||||
* @return DiskBalancerDataNode.
|
* @return DiskBalancerDataNode.
|
||||||
*/
|
*/
|
||||||
public DiskBalancerDataNode getNodeByUUID(String uuid) {
|
public DiskBalancerDataNode getNodeByUUID(String uuid) {
|
||||||
for(DiskBalancerDataNode node : this.getNodes()) {
|
return hostUUID.get(uuid);
|
||||||
if(node.getDataNodeUUID().equals(uuid)) {
|
}
|
||||||
return node;
|
|
||||||
}
|
/**
|
||||||
}
|
* Returns a node by IP Address.
|
||||||
return null;
|
* @param ipAddresss - IP address String.
|
||||||
|
* @return DiskBalancerDataNode.
|
||||||
|
*/
|
||||||
|
public DiskBalancerDataNode getNodeByIPAddress(String ipAddresss) {
|
||||||
|
return ipList.get(ipAddresss);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a node by hostName.
|
||||||
|
* @param hostName - HostName.
|
||||||
|
* @return DiskBalancerDataNode.
|
||||||
|
*/
|
||||||
|
public DiskBalancerDataNode getNodeByName(String hostName) {
|
||||||
|
return hostNames.get(hostName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,7 +220,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
|
||||||
* @param threshold - Percentage
|
* @param threshold - Percentage
|
||||||
* @return true or false
|
* @return true or false
|
||||||
*/
|
*/
|
||||||
public boolean isBalancingNeeded(float threshold) {
|
public boolean isBalancingNeeded(double threshold) {
|
||||||
for (DiskBalancerVolumeSet vSet : getVolumeSets().values()) {
|
for (DiskBalancerVolumeSet vSet : getVolumeSets().values()) {
|
||||||
if (vSet.isBalancingNeeded(threshold)) {
|
if (vSet.isBalancingNeeded(threshold)) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -267,7 +267,7 @@ public class DiskBalancerVolumeSet {
|
||||||
*
|
*
|
||||||
* @return true if balancing is needed false otherwise.
|
* @return true if balancing is needed false otherwise.
|
||||||
*/
|
*/
|
||||||
public boolean isBalancingNeeded(float thresholdPercentage) {
|
public boolean isBalancingNeeded(double thresholdPercentage) {
|
||||||
double threshold = thresholdPercentage / 100.0d;
|
double threshold = thresholdPercentage / 100.0d;
|
||||||
|
|
||||||
if(volumes == null || volumes.size() <= 1) {
|
if(volumes == null || volumes.size() <= 1) {
|
||||||
|
|
|
@ -44,7 +44,7 @@ public class GreedyPlanner implements Planner {
|
||||||
public static final long TB = GB * 1024L;
|
public static final long TB = GB * 1024L;
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(GreedyPlanner.class);
|
LoggerFactory.getLogger(GreedyPlanner.class);
|
||||||
private final float threshold;
|
private final double threshold;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a greedy planner.
|
* Constructs a greedy planner.
|
||||||
|
@ -52,7 +52,7 @@ public class GreedyPlanner implements Planner {
|
||||||
* @param threshold - Disk tolerance that we are ok with
|
* @param threshold - Disk tolerance that we are ok with
|
||||||
* @param node - node on which this planner is operating upon
|
* @param node - node on which this planner is operating upon
|
||||||
*/
|
*/
|
||||||
public GreedyPlanner(float threshold, DiskBalancerDataNode node) {
|
public GreedyPlanner(double threshold, DiskBalancerDataNode node) {
|
||||||
this.threshold = threshold;
|
this.threshold = threshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,9 +5,9 @@
|
||||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance with the License.
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
* <p/>
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* <p/>
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
@ -38,7 +38,7 @@ public final class PlannerFactory {
|
||||||
* @return Planner
|
* @return Planner
|
||||||
*/
|
*/
|
||||||
public static Planner getPlanner(String plannerName,
|
public static Planner getPlanner(String plannerName,
|
||||||
DiskBalancerDataNode node, float threshold) {
|
DiskBalancerDataNode node, double threshold) {
|
||||||
if (plannerName.equals(GREEDY_PLANNER)) {
|
if (plannerName.equals(GREEDY_PLANNER)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
String message = String
|
String message = String
|
||||||
|
|
|
@ -0,0 +1,260 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.tools;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.BasicParser;
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
|
import org.apache.commons.cli.Option;
|
||||||
|
import org.apache.commons.cli.Options;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.command.Command;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DiskBalancer is a tool that can be used to ensure that data is spread evenly
|
||||||
|
* across volumes of same storage type.
|
||||||
|
* <p>
|
||||||
|
* For example, if you have 3 disks, with 100 GB , 600 GB and 200 GB on each
|
||||||
|
* disk, this tool will ensure that each disk will have 300 GB.
|
||||||
|
* <p>
|
||||||
|
* This tool can be run while data nodes are fully functional.
|
||||||
|
* <p>
|
||||||
|
* At very high level diskbalancer computes a set of moves that will make disk
|
||||||
|
* utilization equal and then those moves are executed by the datanode.
|
||||||
|
*/
|
||||||
|
public class DiskBalancer extends Configured implements Tool {
|
||||||
|
/**
|
||||||
|
* Construct a DiskBalancer.
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
*/
|
||||||
|
public DiskBalancer(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NameNodeURI can point to either a real namenode, or a json file that
|
||||||
|
* contains the diskBalancer data in json form, that jsonNodeConnector knows
|
||||||
|
* how to deserialize.
|
||||||
|
* <p>
|
||||||
|
* Expected formats are :
|
||||||
|
* <p>
|
||||||
|
* hdfs://namenode.uri or file:///data/myCluster.json
|
||||||
|
*/
|
||||||
|
public static final String NAMENODEURI = "uri";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes a plan for a given set of nodes.
|
||||||
|
*/
|
||||||
|
public static final String PLAN = "plan";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Output file name, for commands like report, plan etc. This is an optional
|
||||||
|
* argument, by default diskbalancer will write all its output to
|
||||||
|
* /system/reports/diskbalancer of the current cluster it is operating
|
||||||
|
* against.
|
||||||
|
*/
|
||||||
|
public static final String OUTFILE = "out";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Help for the program.
|
||||||
|
*/
|
||||||
|
public static final String HELP = "help";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Percentage of data unevenness that we are willing to live with. For example
|
||||||
|
* - a value like 10 indicates that we are okay with 10 % +/- from
|
||||||
|
* idealStorage Target.
|
||||||
|
*/
|
||||||
|
public static final String THRESHOLD = "thresholdPercentage";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies the maximum disk bandwidth to use per second.
|
||||||
|
*/
|
||||||
|
public static final String BANDWIDTH = "bandwidth";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies the maximum errors to tolerate.
|
||||||
|
*/
|
||||||
|
public static final String MAXERROR = "maxerror";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Node name or IP against which Disk Balancer is being run.
|
||||||
|
*/
|
||||||
|
public static final String NODE = "node";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the command in verbose mode.
|
||||||
|
*/
|
||||||
|
public static final String VERBOSE = "v";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Template for the Before File. It is node.before.json.
|
||||||
|
*/
|
||||||
|
public static final String BEFORE_TEMPLATE = "%s.before.json";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Template for the plan file. it is node.plan.json.
|
||||||
|
*/
|
||||||
|
public static final String PLAN_TEMPLATE = "%s.plan.json";
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(DiskBalancer.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main for the DiskBalancer Command handling.
|
||||||
|
*
|
||||||
|
* @param argv - System Args Strings[]
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public static void main(String[] argv) throws Exception {
|
||||||
|
DiskBalancer shell = new DiskBalancer(new HdfsConfiguration());
|
||||||
|
int res = 0;
|
||||||
|
try {
|
||||||
|
res = ToolRunner.run(shell, argv);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error(ex.toString());
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
System.exit(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the command with the given arguments.
|
||||||
|
*
|
||||||
|
* @param args command specific arguments.
|
||||||
|
* @return exit code.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
Options opts = getOpts();
|
||||||
|
CommandLine cmd = parseArgs(args, opts);
|
||||||
|
return dispatch(cmd, opts);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns the Command Line Options.
|
||||||
|
*
|
||||||
|
* @return Options
|
||||||
|
*/
|
||||||
|
private Options getOpts() {
|
||||||
|
Options opts = new Options();
|
||||||
|
addCommands(opts);
|
||||||
|
return opts;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds commands that we handle to opts.
|
||||||
|
*
|
||||||
|
* @param opt - Optins
|
||||||
|
*/
|
||||||
|
private void addCommands(Options opt) {
|
||||||
|
|
||||||
|
Option nameNodeUri =
|
||||||
|
new Option(NAMENODEURI, true, "NameNode URI. e.g http://namenode" +
|
||||||
|
".mycluster.com or file:///myCluster" +
|
||||||
|
".json");
|
||||||
|
opt.addOption(nameNodeUri);
|
||||||
|
|
||||||
|
Option outFile =
|
||||||
|
new Option(OUTFILE, true, "File to write output to, if not specified " +
|
||||||
|
"defaults will be used." +
|
||||||
|
"e.g -out outfile.txt");
|
||||||
|
opt.addOption(outFile);
|
||||||
|
|
||||||
|
Option plan = new Option(PLAN, false, "write plan to the default file");
|
||||||
|
opt.addOption(plan);
|
||||||
|
|
||||||
|
Option bandwidth = new Option(BANDWIDTH, true, "Maximum disk bandwidth to" +
|
||||||
|
" be consumed by diskBalancer. " +
|
||||||
|
"Expressed as MBs per second.");
|
||||||
|
opt.addOption(bandwidth);
|
||||||
|
|
||||||
|
Option threshold = new Option(THRESHOLD, true, "Percentage skew that we " +
|
||||||
|
"tolerate before diskbalancer starts working or stops when reaching " +
|
||||||
|
"that range.");
|
||||||
|
opt.addOption(threshold);
|
||||||
|
|
||||||
|
Option maxErrors = new Option(MAXERROR, true, "Describes how many errors " +
|
||||||
|
"can be tolerated while copying between a pair of disks.");
|
||||||
|
opt.addOption(maxErrors);
|
||||||
|
|
||||||
|
Option node = new Option(NODE, true, "Node Name or IP");
|
||||||
|
opt.addOption(node);
|
||||||
|
|
||||||
|
Option help =
|
||||||
|
new Option(HELP, true, "Help about a command or this message");
|
||||||
|
opt.addOption(help);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function parses all command line arguments and returns the appropriate
|
||||||
|
* values.
|
||||||
|
*
|
||||||
|
* @param argv - Argv from main
|
||||||
|
* @return CommandLine
|
||||||
|
*/
|
||||||
|
private CommandLine parseArgs(String[] argv, Options opts)
|
||||||
|
throws org.apache.commons.cli.ParseException {
|
||||||
|
BasicParser parser = new BasicParser();
|
||||||
|
return parser.parse(opts, argv);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatches calls to the right command Handler classes.
|
||||||
|
*
|
||||||
|
* @param cmd - CommandLine
|
||||||
|
* @throws IOException
|
||||||
|
* @throws URISyntaxException
|
||||||
|
*/
|
||||||
|
private int dispatch(CommandLine cmd, Options opts)
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
|
Command currentCommand = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (cmd.hasOption(DiskBalancer.PLAN)) {
|
||||||
|
currentCommand = new PlanCommand(getConf());
|
||||||
|
} else {
|
||||||
|
HelpFormatter helpFormatter = new HelpFormatter();
|
||||||
|
helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]",
|
||||||
|
"disk balancer commands", opts,
|
||||||
|
"Please correct your command and try again.");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
currentCommand.execute(cmd);
|
||||||
|
|
||||||
|
} catch (Exception ex) {
|
||||||
|
System.err.printf(ex.getMessage());
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -50,8 +50,6 @@ import org.junit.rules.ExpectedException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
|
||||||
|
|
Loading…
Reference in New Issue