HDFS-9546: DiskBalancer: Add Execute command. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-05-13 10:52:58 -07:00 committed by Arpit Agarwal
parent 75882ec0b0
commit 1b39b283c7
4 changed files with 187 additions and 43 deletions

View File

@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -81,8 +82,6 @@ public abstract class Command extends Configured {
public Command(Configuration conf) {
super(conf);
// These arguments are valid for all commands.
addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " +
"file URI for cluster");
addValidCommandParameters(DiskBalancer.HELP, "Help for this command");
addValidCommandParameters("arg", "");
}
@ -348,9 +347,24 @@ public abstract class Command extends Configured {
* @return OutputStream.
*/
protected FSDataOutputStream create(String fileName) throws IOException {
Preconditions.checkNotNull(fileName);
if(fs == null) {
fs = FileSystem.get(getConf());
}
return fs.create(new Path(this.diskBalancerLogs, fileName));
}
/**
* Returns a InputStream to read data.
*/
protected FSDataInputStream open(String fileName) throws IOException {
Preconditions.checkNotNull(fileName);
if(fs == null) {
fs = FileSystem.get(getConf());
}
return fs.open(new Path(fileName));
}
/**
* Returns the output path where the plan and snapshot gets written.
*

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.server.diskbalancer.command;
import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
/**
* executes a given plan.
*/
public class ExecuteCommand extends Command {
/**
* Constructs ExecuteCommand.
*
* @param conf - Configuration.
*/
public ExecuteCommand(Configuration conf) {
super(conf);
addValidCommandParameters(DiskBalancer.EXECUTE, "Executes a given plan.");
addValidCommandParameters(DiskBalancer.NODE, "Name of the target node.");
}
/**
* Executes the Client Calls.
*
* @param cmd - CommandLine
*/
@Override
public void execute(CommandLine cmd) throws Exception {
LOG.info("Executing \"execute plan\" command");
Preconditions.checkState(cmd.hasOption(DiskBalancer.EXECUTE));
verifyCommandOptions(DiskBalancer.EXECUTE, cmd);
String planFile = cmd.getOptionValue(DiskBalancer.EXECUTE);
Preconditions.checkArgument(planFile == null || planFile.isEmpty(),
"Invalid plan file specified.");
String planData = null;
try (FSDataInputStream plan = open(planFile)) {
planData = IOUtils.toString(plan);
}
submitPlan(planData);
}
/**
* Submits plan to a given data node.
*
* @param planData - PlanData Json String.
* @throws IOException
*/
private void submitPlan(String planData) throws IOException {
Preconditions.checkNotNull(planData);
NodePlan plan = readPlan(planData);
String dataNodeAddress = plan.getNodeName() + ":" + plan.getPort();
Preconditions.checkNotNull(dataNodeAddress);
ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
String planHash = DigestUtils.sha512Hex(planData);
try {
dataNode.submitDiskBalancerPlan(planHash, DiskBalancer.PLAN_VERSION,
planData, false); // TODO : Support skipping date check.
} catch (DiskBalancerException ex) {
LOG.error("Submitting plan on {} failed. Result: {}, Message: {}",
plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
throw ex;
}
}
/**
* Returns a plan from the Json Data.
*
* @param planData - Json String
* @return NodePlan
* @throws IOException
*/
private NodePlan readPlan(String planData) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(planData, NodePlan.class);
}
/**
* Gets extended help for this command.
*
* @return Help Message
*/
@Override
protected String getHelp() {
return "Execute command takes a plan and runs it against the node. e.g. " +
"hdfs diskbalancer -execute <nodename.plan.json> ";
}
}

View File

@ -24,7 +24,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.codehaus.jackson.map.ObjectMapper;
@ -54,6 +55,9 @@ public class PlanCommand extends Command {
this.thresholdPercentage = 1;
this.bandwidth = 0;
this.maxError = 0;
addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " +
"file URI for cluster");
addValidCommandParameters(DiskBalancer.OUTFILE, "Output file");
addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " +
"be used while copying.");
@ -61,7 +65,6 @@ public class PlanCommand extends Command {
"we tolerate before diskbalancer starts working.");
addValidCommandParameters(DiskBalancer.MAXERROR, "Max errors to tolerate " +
"between 2 disks");
addValidCommandParameters(DiskBalancer.NODE, "Name / Address of the node.");
addValidCommandParameters(DiskBalancer.VERBOSE, "Run plan command in " +
"verbose mode.");
}
@ -79,7 +82,7 @@ public class PlanCommand extends Command {
Preconditions.checkState(cmd.hasOption(DiskBalancer.PLAN));
verifyCommandOptions(DiskBalancer.PLAN, cmd);
if (!cmd.hasOption(DiskBalancer.NODE)) {
if (cmd.getOptionValue(DiskBalancer.PLAN) == null) {
throw new IllegalArgumentException("A node name is required to create a" +
" plan.");
}
@ -101,10 +104,11 @@ public class PlanCommand extends Command {
}
setOutputPath(output);
DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.NODE));
// -plan nodename is the command line argument.
DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.PLAN));
if (node == null) {
throw new IllegalArgumentException("Unable to find the specified node. " +
cmd.getOptionValue(DiskBalancer.NODE));
cmd.getOptionValue(DiskBalancer.PLAN));
}
this.thresholdPercentage = getThresholdPercentage(cmd);
setNodesToProcess(node);
@ -115,16 +119,16 @@ public class PlanCommand extends Command {
LOG.info("Writing plan to : {}", getOutputPath());
System.out.printf("Writing plan to : %s%n", getOutputPath());
try(FSDataOutputStream beforeStream = create(String.format(
try (FSDataOutputStream beforeStream = create(String.format(
DiskBalancer.BEFORE_TEMPLATE,
cmd.getOptionValue(DiskBalancer.NODE)))) {
cmd.getOptionValue(DiskBalancer.PLAN)))) {
beforeStream.write(getCluster().toJson()
.getBytes(StandardCharsets.UTF_8));
}
try(FSDataOutputStream planStream = create(String.format(
try (FSDataOutputStream planStream = create(String.format(
DiskBalancer.PLAN_TEMPLATE,
cmd.getOptionValue(DiskBalancer.NODE)))) {
cmd.getOptionValue(DiskBalancer.PLAN)))) {
planStream.write(getPlan(plans).getBytes(StandardCharsets.UTF_8));
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.diskbalancer.command.Command;
import org.apache.hadoop.hdfs.server.diskbalancer.command.ExecuteCommand;
import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -47,15 +48,6 @@ import java.net.URISyntaxException;
* utilization equal and then those moves are executed by the datanode.
*/
public class DiskBalancer extends Configured implements Tool {
/**
* Construct a DiskBalancer.
*
* @param conf
*/
public DiskBalancer(Configuration conf) {
super(conf);
}
/**
* NameNodeURI can point to either a real namenode, or a json file that
* contains the diskBalancer data in json form, that jsonNodeConnector knows
@ -66,12 +58,10 @@ public class DiskBalancer extends Configured implements Tool {
* hdfs://namenode.uri or file:///data/myCluster.json
*/
public static final String NAMENODEURI = "uri";
/**
* Computes a plan for a given set of nodes.
*/
public static final String PLAN = "plan";
/**
* Output file name, for commands like report, plan etc. This is an optional
* argument, by default diskbalancer will write all its output to
@ -79,52 +69,57 @@ public class DiskBalancer extends Configured implements Tool {
* against.
*/
public static final String OUTFILE = "out";
/**
* Help for the program.
*/
public static final String HELP = "help";
/**
* Percentage of data unevenness that we are willing to live with. For example
* - a value like 10 indicates that we are okay with 10 % +/- from
* idealStorage Target.
*/
public static final String THRESHOLD = "thresholdPercentage";
/**
* Specifies the maximum disk bandwidth to use per second.
*/
public static final String BANDWIDTH = "bandwidth";
/**
* Specifies the maximum errors to tolerate.
*/
public static final String MAXERROR = "maxerror";
/**
* Node name or IP against which Disk Balancer is being run.
* Executes a given plan file on the target datanode.
*/
public static final String EXECUTE = "execute";
/**
* Name or address of the node to execute against.
*/
public static final String NODE = "node";
/**
* Runs the command in verbose mode.
*/
public static final String VERBOSE = "v";
public static final int PLAN_VERSION = 1;
/**
* Template for the Before File. It is node.before.json.
*/
public static final String BEFORE_TEMPLATE = "%s.before.json";
/**
* Template for the plan file. it is node.plan.json.
*/
public static final String PLAN_TEMPLATE = "%s.plan.json";
private static final Logger LOG =
LoggerFactory.getLogger(DiskBalancer.class);
/**
* Construct a DiskBalancer.
*
* @param conf
*/
public DiskBalancer(Configuration conf) {
super(conf);
}
/**
* Main for the DiskBalancer Command handling.
*
@ -164,16 +159,16 @@ public class DiskBalancer extends Configured implements Tool {
*/
private Options getOpts() {
Options opts = new Options();
addCommands(opts);
addPlanCommands(opts);
return opts;
}
/**
* Adds commands that we handle to opts.
* Adds commands for plan command.
*
* @param opt - Optins
* @param opt - Options
*/
private void addCommands(Options opt) {
private void addPlanCommands(Options opt) {
Option nameNodeUri =
new Option(NAMENODEURI, true, "NameNode URI. e.g http://namenode" +
@ -187,7 +182,8 @@ public class DiskBalancer extends Configured implements Tool {
"e.g -out outfile.txt");
opt.addOption(outFile);
Option plan = new Option(PLAN, false, "write plan to the default file");
Option plan = new Option(PLAN, true , "create a plan for the given node. " +
"e.g -plan <nodename> | <nodeIP> | <nodeUUID>");
opt.addOption(plan);
Option bandwidth = new Option(BANDWIDTH, true, "Maximum disk bandwidth to" +
@ -204,13 +200,19 @@ public class DiskBalancer extends Configured implements Tool {
"can be tolerated while copying between a pair of disks.");
opt.addOption(maxErrors);
Option node = new Option(NODE, true, "Node Name or IP");
opt.addOption(node);
Option help =
new Option(HELP, true, "Help about a command or this message");
opt.addOption(help);
}
/**
* Adds execute command options.
* @param opt Options
*/
private void addExecuteCommands(Options opt) {
Option execute = new Option(EXECUTE, true , "Takes a plan file and " +
"submits it for execution to the datanode. e.g -execute <planfile>");
opt.addOption(execute);
}
/**
@ -238,18 +240,23 @@ public class DiskBalancer extends Configured implements Tool {
Command currentCommand = null;
try {
if (cmd.hasOption(DiskBalancer.PLAN)) {
currentCommand = new PlanCommand(getConf());
} else {
}
if(cmd.hasOption(DiskBalancer.EXECUTE)) {
currentCommand = new ExecuteCommand(getConf());
}
if(currentCommand == null) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]",
"disk balancer commands", opts,
"Please correct your command and try again.");
return 1;
}
currentCommand.execute(cmd);
} catch (Exception ex) {
System.err.printf(ex.getMessage());
return 1;