HDFS-9461. DiskBalancer: Add Report Command. Contributed by Xiaobing Zhou.

This commit is contained in:
Anu Engineer 2016-06-10 21:15:54 -07:00 committed by Arpit Agarwal
parent 121142cf95
commit b502102bb1
6 changed files with 10136 additions and 2 deletions

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hdfs.server.diskbalancer.command;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -70,6 +72,7 @@ public abstract class Command extends Configured {
private URI clusterURI;
private FileSystem fs = null;
private DiskBalancerCluster cluster = null;
private int topNodes;
private static final Path DEFAULT_LOG_DIR = new Path("/system/diskbalancer");
@ -83,6 +86,7 @@ public abstract class Command extends Configured {
// These arguments are valid for all commands.
addValidCommandParameters(DiskBalancer.HELP, "Help for this command");
addValidCommandParameters("arg", "");
topNodes = 0;
}
/**
@ -391,4 +395,67 @@ public abstract class Command extends Configured {
protected DiskBalancerCluster getCluster() {
return cluster;
}
/**
* returns default top number of nodes.
* @return default top number of nodes.
*/
protected int getDefaultTop() {
return DiskBalancer.DEFAULT_TOP;
}
/**
* Put output line to log and string buffer.
* */
protected void recordOutput(final StrBuilder result,
final String outputLine) {
LOG.info(outputLine);
result.appendln(outputLine);
}
/**
* Parse top number of nodes to be processed.
* @return top number of nodes to be processed.
*/
protected int parseTopNodes(final CommandLine cmd, final StrBuilder result) {
String outputLine = "";
int nodes = 0;
final String topVal = cmd.getOptionValue(DiskBalancer.TOP);
if (StringUtils.isBlank(topVal)) {
outputLine = String.format(
"No top limit specified, using default top value %d.",
getDefaultTop());
LOG.info(outputLine);
result.appendln(outputLine);
nodes = getDefaultTop();
} else {
try {
nodes = Integer.parseInt(topVal);
} catch (NumberFormatException nfe) {
outputLine = String.format(
"Top limit input is not numeric, using default top value %d.",
getDefaultTop());
LOG.info(outputLine);
result.appendln(outputLine);
nodes = getDefaultTop();
}
}
return Math.min(nodes, cluster.getNodes().size());
}
/**
* Set top number of nodes to be processed.
* */
public void setTopNodes(int topNodes) {
this.topNodes = topNodes;
}
/**
* Get top number of nodes to be processed.
* @return top number of nodes to be processed.
* */
public int getTopNodes() {
return topNodes;
}
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer.command;
import java.io.PrintStream;
import java.util.Collections;
import java.util.ListIterator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import com.google.common.base.Preconditions;
/**
* Executes the report command.
*
* This command will report volume information for a specific DataNode or top X
* DataNode(s) benefiting from running DiskBalancer.
*
* This is done by reading the cluster info, sorting the DiskbalancerNodes by
* their NodeDataDensity and printing out the info.
*/
public class ReportCommand extends Command {
private PrintStream out;
public ReportCommand(Configuration conf, final PrintStream out) {
super(conf);
this.out = out;
addValidCommandParameters(DiskBalancer.REPORT,
"Report volume information of nodes.");
String desc = String.format(
"Top number of nodes to be processed. Default: %d", getDefaultTop());
addValidCommandParameters(DiskBalancer.TOP, desc);
desc = String.format("Print out volume information for a DataNode.");
addValidCommandParameters(DiskBalancer.NODE, desc);
}
@Override
public void execute(CommandLine cmd) throws Exception {
StrBuilder result = new StrBuilder();
String outputLine = "Processing report command";
recordOutput(result, outputLine);
Preconditions.checkState(cmd.hasOption(DiskBalancer.REPORT));
verifyCommandOptions(DiskBalancer.REPORT, cmd);
readClusterInfo(cmd);
final String nodeFormat =
"%d/%d %s[%s:%d] - <%s>: %d volumes with node data density %.2f.";
final String nodeFormatWithoutSequence =
"%s[%s:%d] - <%s>: %d volumes with node data density %.2f.";
final String volumeFormat =
"[%s: volume-%s] - %.2f used: %d/%d, %.2f free: %d/%d, "
+ "isFailed: %s, isReadOnly: %s, isSkip: %s, isTransient: %s.";
if (cmd.hasOption(DiskBalancer.NODE)) {
/*
* Reporting volume information for a specific DataNode
*/
handleNodeReport(cmd, result, nodeFormatWithoutSequence, volumeFormat);
} else { // handle TOP
/*
* Reporting volume information for top X DataNode(s)
*/
handleTopReport(cmd, result, nodeFormat);
}
out.println(result.toString());
}
private void handleTopReport(final CommandLine cmd, final StrBuilder result,
final String nodeFormat) {
Collections.sort(getCluster().getNodes(), Collections.reverseOrder());
/* extract value that identifies top X DataNode(s) */
setTopNodes(parseTopNodes(cmd, result));
/*
* Reporting volume information of top X DataNode(s) in summary
*/
final String outputLine = String.format(
"Reporting top %d DataNode(s) benefiting from running DiskBalancer.",
getTopNodes());
recordOutput(result, outputLine);
ListIterator<DiskBalancerDataNode> li = getCluster().getNodes()
.listIterator();
for (int i = 0; i < getTopNodes() && li.hasNext(); i++) {
DiskBalancerDataNode dbdn = li.next();
result.appendln(String.format(nodeFormat,
i+1,
getTopNodes(),
dbdn.getDataNodeName(),
dbdn.getDataNodeIP(),
dbdn.getDataNodePort(),
dbdn.getDataNodeUUID(),
dbdn.getVolumeCount(),
dbdn.getNodeDataDensity()));
}
}
private void handleNodeReport(final CommandLine cmd, StrBuilder result,
final String nodeFormat, final String volumeFormat) {
String outputLine = "";
/*
* get value that identifies a DataNode from command line, it could be UUID,
* IP address or host name.
*/
final String nodeVal = cmd.getOptionValue(DiskBalancer.NODE);
if (StringUtils.isBlank(nodeVal)) {
outputLine = "The value for '-node' is neither specified or empty.";
recordOutput(result, outputLine);
} else {
/*
* Reporting volume information for a specific DataNode
*/
outputLine = String.format(
"Reporting volume information for DataNode '%s'.", nodeVal);
recordOutput(result, outputLine);
final String trueStr = "True";
final String falseStr = "False";
DiskBalancerDataNode dbdn = getNode(nodeVal);
if (dbdn == null) {
outputLine = String.format(
"Can't find a DataNode that matches '%s'.", nodeVal);
recordOutput(result, outputLine);
} else {
result.appendln(String.format(nodeFormat,
dbdn.getDataNodeName(),
dbdn.getDataNodeIP(),
dbdn.getDataNodePort(),
dbdn.getDataNodeUUID(),
dbdn.getVolumeCount(),
dbdn.getNodeDataDensity()));
for (DiskBalancerVolumeSet vset : dbdn.getVolumeSets().values()) {
for (DiskBalancerVolume vol : vset.getVolumes()) {
result.appendln(String.format(volumeFormat,
vol.getStorageType(),
vol.getPath(),
vol.getUsedRatio(),
vol.getUsed(),
vol.getCapacity(),
vol.getFreeRatio(),
vol.getFreeSpace(),
vol.getCapacity(),
vol.isFailed() ? trueStr : falseStr,
vol.isReadOnly() ? trueStr : falseStr,
vol.isSkip() ? trueStr : falseStr,
vol.isTransient() ? trueStr : falseStr));
}
}
}
}
}
@Override
protected String getHelp() {
return "Report volume information for a specific DataNode or top X "
+ "one(s) benefiting from running DiskBalancer, "
+ "top defaults to " + getDefaultTop() + ". E.g.:\n"
+ "hdfs diskbalancer -uri http://namenode.uri -report\n"
+ "hdfs diskbalancer -uri http://namenode.uri -report -top 5\n"
+ "hdfs diskbalancer -uri http://namenode.uri -report "
+ "-node {DataNodeID | IP | Hostname}";
}
}

View File

@ -139,6 +139,36 @@ public class DiskBalancerVolume {
return capacity;
}
/**
* Get free space of the volume.
*
* @return long
*/
@JsonIgnore
public long getFreeSpace() {
return getCapacity() - getUsed();
}
/**
* Get ratio between used space and capacity.
*
* @return double
*/
@JsonIgnore
public double getUsedRatio() {
return (1.0 * getUsed()) / getCapacity();
}
/**
* Get ratio between free space and capacity.
*
* @return double
*/
@JsonIgnore
public double getFreeRatio() {
return (1.0 * getFreeSpace()) / getCapacity();
}
/**
* Sets the capacity of this volume.
*

View File

@ -29,12 +29,14 @@ import org.apache.hadoop.hdfs.server.diskbalancer.command.Command;
import org.apache.hadoop.hdfs.server.diskbalancer.command.ExecuteCommand;
import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand;
import org.apache.hadoop.hdfs.server.diskbalancer.command.QueryCommand;
import org.apache.hadoop.hdfs.server.diskbalancer.command.ReportCommand;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URISyntaxException;
/**
@ -93,6 +95,22 @@ public class DiskBalancer extends Configured implements Tool {
* Executes a given plan file on the target datanode.
*/
public static final String EXECUTE = "execute";
/**
* The report command prints out a disk fragmentation report about the data
* cluster. By default it prints the DEFAULT_TOP machines names with high
* nodeDataDensity {DiskBalancerDataNode#getNodeDataDensity} values. This
* means that these are the nodes that deviates from the ideal data
* distribution.
*/
public static final String REPORT = "report";
/**
* specify top number of nodes to be processed.
*/
public static final String TOP = "top";
/**
* specify default top number of nodes to be processed.
*/
public static final int DEFAULT_TOP = 100;
/**
* Name or address of the node to execute against.
*/
@ -157,9 +175,21 @@ public class DiskBalancer extends Configured implements Tool {
*/
@Override
public int run(String[] args) throws Exception {
return run(args, System.out);
}
/**
* Execute the command with the given arguments.
*
* @param args command specific arguments.
* @param out the output stream used for printing
* @return exit code.
* @throws Exception
*/
public int run(String[] args, final PrintStream out) throws Exception {
Options opts = getOpts();
CommandLine cmd = parseArgs(args, opts);
return dispatch(cmd, opts);
return dispatch(cmd, opts, out);
}
/**
@ -173,6 +203,7 @@ public class DiskBalancer extends Configured implements Tool {
addExecuteCommands(opts);
addQueryCommands(opts);
addCancelCommands(opts);
addReportCommands(opts);
return opts;
}
@ -255,6 +286,26 @@ public class DiskBalancer extends Configured implements Tool {
opt.addOption(node);
}
/**
* Adds report command options.
* @param opt Options
*/
private void addReportCommands(Options opt) {
Option report = new Option(REPORT, false,
"Report volume information of DataNode(s)"
+ " benefiting from running DiskBalancer. "
+ "-report [top -X] | [-node {DataNodeID | IP | Hostname}].");
opt.addOption(report);
Option top = new Option(TOP, true,
"specify the top number of nodes to be processed.");
opt.addOption(top);
Option node = new Option(NODE, true,
"Name of the datanode in the format of DataNodeID, IP or hostname.");
opt.addOption(node);
}
/**
* This function parses all command line arguments and returns the appropriate
* values.
@ -272,10 +323,12 @@ public class DiskBalancer extends Configured implements Tool {
* Dispatches calls to the right command Handler classes.
*
* @param cmd - CommandLine
* @param opts options of command line
* @param out the output stream used for printing
* @throws IOException
* @throws URISyntaxException
*/
private int dispatch(CommandLine cmd, Options opts)
private int dispatch(CommandLine cmd, Options opts, final PrintStream out)
throws IOException, URISyntaxException {
Command currentCommand = null;
@ -297,6 +350,10 @@ public class DiskBalancer extends Configured implements Tool {
currentCommand = new CancelCommand(getConf());
}
if (cmd.hasOption(DiskBalancer.REPORT)) {
currentCommand = new ReportCommand(getConf(), out);
}
if(currentCommand == null) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]",

View File

@ -0,0 +1,299 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer.command;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Scanner;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
/**
* Tests various CLI commands of DiskBalancer.
*/
public class TestDiskBalancerCommand {
private MiniDFSCluster cluster;
private URI clusterJson;
@Before
public void setUp() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.storagesPerDatanode(2).build();
cluster.waitActive();
clusterJson = getClass().getResource(
"/diskBalancer/data-cluster-64node-3disk.json").toURI();
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
// Just make sure we can shutdown datanodes.
cluster.getDataNodes().get(0).shutdown();
cluster.shutdown();
}
}
private void testReportSimple() throws Exception {
final String cmdLine = String.format("hdfs diskbalancer -uri %s -report",
clusterJson.toString());
final List<String> outputs = runCommand(cmdLine);
assertThat(
outputs.get(0),
containsString("Processing report command"));
assertThat(
outputs.get(1),
is(allOf(containsString("No top limit specified"),
containsString("using default top value"), containsString("100"))));
assertThat(
outputs.get(2),
is(allOf(
containsString("Reporting top"),
containsString("64"),
containsString(
"DataNode(s) benefiting from running DiskBalancer"))));
assertThat(
outputs.get(32),
is(allOf(containsString("30/64 null[null:0]"),
containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
containsString("9 volumes with node data density 1.97"))));
}
private void testReportLessThanTotal() throws Exception {
final String cmdLine = String.format(
"hdfs diskbalancer -uri %s -report -top 32", clusterJson.toString());
final List<String> outputs = runCommand(cmdLine);
assertThat(
outputs.get(0),
containsString("Processing report command"));
assertThat(
outputs.get(1),
is(allOf(
containsString("Reporting top"),
containsString("32"),
containsString(
"DataNode(s) benefiting from running DiskBalancer"))));
assertThat(
outputs.get(31),
is(allOf(containsString("30/32 null[null:0]"),
containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
containsString("9 volumes with node data density 1.97"))));
}
private void testReportMoreThanTotal() throws Exception {
final String cmdLine = String.format(
"hdfs diskbalancer -uri %s -report -top 128", clusterJson.toString());
final List<String> outputs = runCommand(cmdLine);
assertThat(
outputs.get(0),
containsString("Processing report command"));
assertThat(
outputs.get(1),
is(allOf(
containsString("Reporting top"),
containsString("64"),
containsString(
"DataNode(s) benefiting from running DiskBalancer"))));
assertThat(
outputs.get(31),
is(allOf(containsString("30/64 null[null:0]"),
containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
containsString("9 volumes with node data density 1.97"))));
}
private void testReportInvalidTopLimit() throws Exception {
final String cmdLine = String.format(
"hdfs diskbalancer -uri %s -report -top xx", clusterJson.toString());
final List<String> outputs = runCommand(cmdLine);
assertThat(
outputs.get(0),
containsString("Processing report command"));
assertThat(
outputs.get(1),
is(allOf(containsString("Top limit input is not numeric"),
containsString("using default top value"), containsString("100"))));
assertThat(
outputs.get(2),
is(allOf(
containsString("Reporting top"),
containsString("64"),
containsString(
"DataNode(s) benefiting from running DiskBalancer"))));
assertThat(
outputs.get(32),
is(allOf(containsString("30/64 null[null:0]"),
containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
containsString("9 volumes with node data density 1.97"))));
}
private void testReportNode() throws Exception {
final String cmdLine = String
.format(
"hdfs diskbalancer -uri %s -report -node "
+ "a87654a9-54c7-4693-8dd9-c9c7021dc340",
clusterJson.toString());
final List<String> outputs = runCommand(cmdLine);
assertThat(
outputs.get(0),
containsString("Processing report command"));
assertThat(
outputs.get(1),
is(allOf(containsString("Reporting volume information for DataNode"),
containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"))));
assertThat(
outputs.get(2),
is(allOf(containsString("null[null:0]"),
containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
containsString("9 volumes with node data density 1.97"))));
assertThat(
outputs.get(3),
is(allOf(containsString("DISK"),
containsString("/tmp/disk/xx3j3ph3zd"),
containsString("0.72 used: 289544224916/400000000000"),
containsString("0.28 free: 110455775084/400000000000"))));
assertThat(
outputs.get(4),
is(allOf(containsString("DISK"),
containsString("/tmp/disk/Mxfcfmb24Y"),
containsString("0.92 used: 733099315216/800000000000"),
containsString("0.08 free: 66900684784/800000000000"))));
assertThat(
outputs.get(5),
is(allOf(containsString("DISK"),
containsString("DISK"),
containsString("/tmp/disk/KmHefYNURo"),
containsString("0.20 used: 39160240782/200000000000"),
containsString("0.80 free: 160839759218/200000000000"))));
assertThat(
outputs.get(6),
is(allOf(containsString("RAM_DISK"),
containsString("/tmp/disk/MXRyYsCz3U"),
containsString("0.55 used: 438102096853/800000000000"),
containsString("0.45 free: 361897903147/800000000000"))));
assertThat(
outputs.get(7),
is(allOf(containsString("RAM_DISK"),
containsString("/tmp/disk/DtmAygEU6f"),
containsString("0.34 used: 134602910470/400000000000"),
containsString("0.66 free: 265397089530/400000000000"))));
assertThat(
outputs.get(8),
is(allOf(containsString("RAM_DISK"),
containsString("/tmp/disk/BoBlQFxhfw"),
containsString("0.60 used: 477590453390/800000000000"),
containsString("0.40 free: 322409546610/800000000000"))));
assertThat(
outputs.get(9),
is(allOf(containsString("SSD"),
containsString("/tmp/disk/BGe09Y77dI"),
containsString("0.89 used: 890446265501/1000000000000"),
containsString("0.11 free: 109553734499/1000000000000"))));
assertThat(
outputs.get(10),
is(allOf(containsString("SSD"),
containsString("/tmp/disk/JX3H8iHggM"),
containsString("0.31 used: 2782614512957/9000000000000"),
containsString("0.69 free: 6217385487043/9000000000000"))));
assertThat(
outputs.get(11),
is(allOf(containsString("SSD"),
containsString("/tmp/disk/uLOYmVZfWV"),
containsString("0.75 used: 1509592146007/2000000000000"),
containsString("0.25 free: 490407853993/2000000000000"))));
}
@Test(timeout=60000)
public void testReportCommmand() throws Exception {
/* test basic report */
testReportSimple();
/* test less than 64 DataNode(s) as total, e.g., -report -top 32 */
testReportLessThanTotal();
/* test more than 64 DataNode(s) as total, e.g., -report -top 128 */
testReportMoreThanTotal();
/* test invalid top limit, e.g., -report -top xx */
testReportInvalidTopLimit();
/* test -report -node DataNodeID */
testReportNode();
}
@Test
public void testReadClusterFromJson() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
conf);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(
jsonConnector);
diskBalancerCluster.readClusterInfo();
assertEquals(64, diskBalancerCluster.getNodes().size());
}
private List<String> runCommand(final String cmdLine) throws Exception {
String[] cmds = StringUtils.split(cmdLine, ' ');
Configuration conf = new HdfsConfiguration();
org.apache.hadoop.hdfs.tools.DiskBalancer db =
new org.apache.hadoop.hdfs.tools.DiskBalancer(conf);
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bufOut);
db.run(cmds, out);
Scanner scanner = new Scanner(bufOut.toString());
List<String> outputs = Lists.newArrayList();
while (scanner.hasNextLine()) {
outputs.add(scanner.nextLine());
}
return outputs;
}
}