HDFS-13178. Disk Balancer: Add a force option to DiskBalancer Execute command. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
ba82e5c488
commit
a3c304c5dd
|
@ -3509,7 +3509,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
+ " Disk balancing not permitted.",
|
+ " Disk balancing not permitted.",
|
||||||
DiskBalancerException.Result.DATANODE_STATUS_NOT_REGULAR);
|
DiskBalancerException.Result.DATANODE_STATUS_NOT_REGULAR);
|
||||||
}
|
}
|
||||||
// TODO : Support force option
|
|
||||||
this.diskBalancer.submitPlan(planID, planVersion, planFile, planData,
|
this.diskBalancer.submitPlan(planID, planVersion, planFile, planData,
|
||||||
skipDateCheck);
|
skipDateCheck);
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,8 @@ public class ExecuteCommand extends Command {
|
||||||
super(conf);
|
super(conf);
|
||||||
addValidCommandParameters(DiskBalancerCLI.EXECUTE,
|
addValidCommandParameters(DiskBalancerCLI.EXECUTE,
|
||||||
"Executes a given plan.");
|
"Executes a given plan.");
|
||||||
|
addValidCommandParameters(DiskBalancerCLI.SKIPDATECHECK,
|
||||||
|
"skips the date check and force execute the plan");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -69,7 +71,16 @@ public class ExecuteCommand extends Command {
|
||||||
try (FSDataInputStream plan = open(planFile)) {
|
try (FSDataInputStream plan = open(planFile)) {
|
||||||
planData = IOUtils.toString(plan);
|
planData = IOUtils.toString(plan);
|
||||||
}
|
}
|
||||||
submitPlan(planFile, planData);
|
|
||||||
|
boolean skipDateCheck = false;
|
||||||
|
if(cmd.hasOption(DiskBalancerCLI.SKIPDATECHECK)) {
|
||||||
|
skipDateCheck = true;
|
||||||
|
LOG.warn("Skipping date check on this plan. This could mean we are " +
|
||||||
|
"executing an old plan and may not be the right plan for this " +
|
||||||
|
"data node.");
|
||||||
|
}
|
||||||
|
|
||||||
|
submitPlan(planFile, planData, skipDateCheck);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -77,9 +88,11 @@ public class ExecuteCommand extends Command {
|
||||||
*
|
*
|
||||||
* @param planFile - Plan file name
|
* @param planFile - Plan file name
|
||||||
* @param planData - Plan data in json format
|
* @param planData - Plan data in json format
|
||||||
|
* @param skipDateCheck - skips date check
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void submitPlan(final String planFile, final String planData)
|
private void submitPlan(final String planFile, final String planData,
|
||||||
|
boolean skipDateCheck)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkNotNull(planData);
|
Preconditions.checkNotNull(planData);
|
||||||
NodePlan plan = NodePlan.parseJson(planData);
|
NodePlan plan = NodePlan.parseJson(planData);
|
||||||
|
@ -88,9 +101,8 @@ public class ExecuteCommand extends Command {
|
||||||
ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
|
ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
|
||||||
String planHash = DigestUtils.shaHex(planData);
|
String planHash = DigestUtils.shaHex(planData);
|
||||||
try {
|
try {
|
||||||
// TODO : Support skipping date check.
|
|
||||||
dataNode.submitDiskBalancerPlan(planHash, DiskBalancerCLI.PLAN_VERSION,
|
dataNode.submitDiskBalancerPlan(planHash, DiskBalancerCLI.PLAN_VERSION,
|
||||||
planFile, planData, false);
|
planFile, planData, skipDateCheck);
|
||||||
} catch (DiskBalancerException ex) {
|
} catch (DiskBalancerException ex) {
|
||||||
LOG.error("Submitting plan on {} failed. Result: {}, Message: {}",
|
LOG.error("Submitting plan on {} failed. Result: {}, Message: {}",
|
||||||
plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
|
plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
|
||||||
|
|
|
@ -84,6 +84,13 @@ public class DiskBalancerCLI extends Configured implements Tool {
|
||||||
* Executes a given plan file on the target datanode.
|
* Executes a given plan file on the target datanode.
|
||||||
*/
|
*/
|
||||||
public static final String EXECUTE = "execute";
|
public static final String EXECUTE = "execute";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skips date check(now by default the plan is valid for 24 hours), and force
|
||||||
|
* execute the plan.
|
||||||
|
*/
|
||||||
|
public static final String SKIPDATECHECK = "skipDateCheck";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The report command prints out a disk fragmentation report about the data
|
* The report command prints out a disk fragmentation report about the data
|
||||||
* cluster. By default it prints the DEFAULT_TOP machines names with high
|
* cluster. By default it prints the DEFAULT_TOP machines names with high
|
||||||
|
@ -342,7 +349,15 @@ public class DiskBalancerCLI extends Configured implements Tool {
|
||||||
"submits it for execution by the datanode.")
|
"submits it for execution by the datanode.")
|
||||||
.create();
|
.create();
|
||||||
getExecuteOptions().addOption(execute);
|
getExecuteOptions().addOption(execute);
|
||||||
|
|
||||||
|
|
||||||
|
Option skipDateCheck = OptionBuilder.withLongOpt(SKIPDATECHECK)
|
||||||
|
.withDescription("skips the date check and force execute the plan")
|
||||||
|
.create();
|
||||||
|
getExecuteOptions().addOption(skipDateCheck);
|
||||||
|
|
||||||
opt.addOption(execute);
|
opt.addOption(execute);
|
||||||
|
opt.addOption(skipDateCheck);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.OUTFILE;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT;
|
||||||
|
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.SKIPDATECHECK;
|
||||||
import static org.hamcrest.CoreMatchers.allOf;
|
import static org.hamcrest.CoreMatchers.allOf;
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
@ -276,6 +277,45 @@ public class TestDiskBalancerCommand {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testDiskBalancerForceExecute() throws
|
||||||
|
Exception {
|
||||||
|
final int numDatanodes = 1;
|
||||||
|
|
||||||
|
final Configuration hdfsConf = new HdfsConfiguration();
|
||||||
|
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0d");
|
||||||
|
|
||||||
|
/* new cluster with imbalanced capacity */
|
||||||
|
final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
|
||||||
|
newImbalancedCluster(
|
||||||
|
hdfsConf,
|
||||||
|
numDatanodes,
|
||||||
|
CAPACITIES,
|
||||||
|
DEFAULT_BLOCK_SIZE,
|
||||||
|
FILE_LEN);
|
||||||
|
|
||||||
|
try {
|
||||||
|
/* get full path of plan */
|
||||||
|
final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
|
||||||
|
|
||||||
|
/* run execute command */
|
||||||
|
final String cmdLine = String.format(
|
||||||
|
"hdfs diskbalancer -%s %s -%s",
|
||||||
|
EXECUTE,
|
||||||
|
planFileFullName,
|
||||||
|
SKIPDATECHECK);
|
||||||
|
|
||||||
|
// Disk Balancer should execute the plan, as skipDateCheck Option is
|
||||||
|
// specified
|
||||||
|
runCommand(cmdLine, hdfsConf, miniCluster);
|
||||||
|
} finally{
|
||||||
|
if (miniCluster != null) {
|
||||||
|
miniCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout = 600000)
|
@Test(timeout = 600000)
|
||||||
public void testDiskBalancerExecuteOptionPlanValidity() throws Exception {
|
public void testDiskBalancerExecuteOptionPlanValidity() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue