diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/CancelCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/CancelCommand.java new file mode 100644 index 00000000000..f39580281d6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/CancelCommand.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.command; + +import com.google.common.base.Preconditions; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; +import org.apache.hadoop.hdfs.tools.DiskBalancer; + +import java.io.IOException; + +/** + * Cancels a running plan. + */ +public class CancelCommand extends Command { + /** + * Contructs a cancel Command. + * + * @param conf - Conf + */ + public CancelCommand(Configuration conf) { + super(conf); + addValidCommandParameters(DiskBalancer.CANCEL, "Cancels a running plan."); + addValidCommandParameters(DiskBalancer.NODE, "Node to run the command " + + "against in node:port format."); + } + + /** + * Executes the Client Calls. + * + * @param cmd - CommandLine + */ + @Override + public void execute(CommandLine cmd) throws Exception { + LOG.info("Executing \"Cancel plan\" command."); + Preconditions.checkState(cmd.hasOption(DiskBalancer.CANCEL)); + verifyCommandOptions(DiskBalancer.CANCEL, cmd); + + // We can cancel a plan using datanode address and plan ID + // that you can read from a datanode using queryStatus + if(cmd.hasOption(DiskBalancer.NODE)) { + String nodeAddress = cmd.getOptionValue(DiskBalancer.NODE); + String planHash = cmd.getOptionValue(DiskBalancer.CANCEL); + cancelPlanUsingHash(nodeAddress, planHash); + } else { + // Or you can cancel a plan using the plan file. If the user + // points us to the plan file, we can compute the hash as well as read + // the address of the datanode from the plan file. + String planFile = cmd.getOptionValue(DiskBalancer.CANCEL); + Preconditions.checkArgument(planFile == null || planFile.isEmpty(), + "Invalid plan file specified."); + String planData = null; + try (FSDataInputStream plan = open(planFile)) { + planData = IOUtils.toString(plan); + } + cancelPlan(planData); + } + } + + /** + * Cancels a running plan. + * + * @param planData - Plan data. + * @throws IOException + */ + private void cancelPlan(String planData) throws IOException { + Preconditions.checkNotNull(planData); + NodePlan plan = readPlan(planData); + String dataNodeAddress = plan.getNodeName() + ":" + plan.getPort(); + Preconditions.checkNotNull(dataNodeAddress); + ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress); + String planHash = DigestUtils.sha512Hex(planData); + try { + dataNode.cancelDiskBalancePlan(planHash); + } catch (DiskBalancerException ex) { + LOG.error("Cancelling plan on {} failed. Result: {}, Message: {}", + plan.getNodeName(), ex.getResult().toString(), ex.getMessage()); + throw ex; + } + } + + /** + * Cancels a running plan. + * @param nodeAddress - Address of the data node. + * @param hash - Sha512 hash of the plan, which can be read from datanode + * using query status command. + * @throws IOException + */ + private void cancelPlanUsingHash(String nodeAddress, String hash) throws + IOException { + Preconditions.checkNotNull(nodeAddress); + Preconditions.checkNotNull(hash); + ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress); + try { + dataNode.cancelDiskBalancePlan(hash); + } catch (DiskBalancerException ex) { + LOG.error("Cancelling plan on {} failed. Result: {}, Message: {}", + nodeAddress, ex.getResult().toString(), ex.getMessage()); + throw ex; + } + } + + + /** + * Gets extended help for this command. + * + * @return Help Message + */ + @Override + protected String getHelp() { + return "Cancels a running command. e.g -cancel or -cancel " + + " -node "; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java index 919d5493f70..feee977fc95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.hdfs.tools.DiskBalancer; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -392,4 +394,16 @@ public abstract class Command extends Configured { protected DiskBalancerCluster getCluster() { return cluster; } + + /** + * Returns a plan from the Json Data. + * + * @param planData - Json String + * @return NodePlan + * @throws IOException + */ + protected NodePlan readPlan(String planData) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(planData, NodePlan.class); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java index 1f7e81ffd99..c17ef00631d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java @@ -27,9 +27,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException; -import org.apache.hadoop.hdfs.tools.DiskBalancer; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; -import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.hdfs.tools.DiskBalancer; + import java.io.IOException; @@ -94,17 +94,7 @@ public class ExecuteCommand extends Command { } } - /** - * Returns a plan from the Json Data. - * - * @param planData - Json String - * @return NodePlan - * @throws IOException - */ - private NodePlan readPlan(String planData) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(planData, NodePlan.class); - } + /** * Gets extended help for this command. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java index f5dbe4ebf8d..4005652929e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java @@ -24,6 +24,7 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.diskbalancer.command.CancelCommand; import org.apache.hadoop.hdfs.server.diskbalancer.command.Command; import org.apache.hadoop.hdfs.server.diskbalancer.command.ExecuteCommand; import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand; @@ -105,7 +106,10 @@ public class DiskBalancer extends Configured implements Tool { * Reports the status of disk balancer operation. */ public static final String QUERY = "query"; - + /** + * Cancels a running plan. + */ + public static final String CANCEL = "cancel"; /** * Template for the Before File. It is node.before.json. */ @@ -168,6 +172,7 @@ public class DiskBalancer extends Configured implements Tool { addPlanCommands(opts); addExecuteCommands(opts); addQueryCommands(opts); + addCancelCommands(opts); return opts; } @@ -233,6 +238,19 @@ public class DiskBalancer extends Configured implements Tool { opt.addOption(query); } + /** + * Adds cancel command options. + * @param opt Options + */ + private void addCancelCommands(Options opt) { + Option cancel = new Option(CANCEL, true, "Cancels a running plan. -cancel" + + " or -cancel -node "); + opt.addOption(cancel); + Option node = new Option(NODE, true, "Name of the datanode in name:port " + + "format"); + opt.addOption(node); + } + /** * This function parses all command line arguments and returns the appropriate * values. @@ -271,6 +289,10 @@ public class DiskBalancer extends Configured implements Tool { currentCommand = new QueryCommand(getConf()); } + if(cmd.hasOption(DiskBalancer.CANCEL)) { + currentCommand = new CancelCommand(getConf()); + } + if(currentCommand == null) { HelpFormatter helpFormatter = new HelpFormatter(); helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]",