HDFS-13181. DiskBalancer: Add an configuration for valid plan hours . Contributed by Bharat Viswanadham.
This commit is contained in:
parent
b22eaa05a1
commit
7da0b78cf2
|
@ -1157,6 +1157,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.disk.balancer.max.disk.errors";
|
"dfs.disk.balancer.max.disk.errors";
|
||||||
public static final int DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT = 5;
|
public static final int DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT = 5;
|
||||||
|
|
||||||
|
public static final String DFS_DISK_BALANCER_PLAN_VALID_INTERVAL =
|
||||||
|
"dfs.disk.balancer.plan.valid.interval";
|
||||||
|
public static final String DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT =
|
||||||
|
"1d";
|
||||||
|
|
||||||
|
|
||||||
public static final String DFS_DISK_BALANCER_BLOCK_TOLERANCE =
|
public static final String DFS_DISK_BALANCER_BLOCK_TOLERANCE =
|
||||||
"dfs.disk.balancer.block.tolerance.percent";
|
"dfs.disk.balancer.block.tolerance.percent";
|
||||||
|
|
|
@ -91,6 +91,8 @@ public class DiskBalancer {
|
||||||
private String planFile;
|
private String planFile;
|
||||||
private DiskBalancerWorkStatus.Result currentResult;
|
private DiskBalancerWorkStatus.Result currentResult;
|
||||||
private long bandwidth;
|
private long bandwidth;
|
||||||
|
private long planValidityInterval;
|
||||||
|
private final Configuration config;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a Disk Balancer object. This object takes care of reading a
|
* Constructs a Disk Balancer object. This object takes care of reading a
|
||||||
|
@ -102,6 +104,7 @@ public class DiskBalancer {
|
||||||
*/
|
*/
|
||||||
public DiskBalancer(String dataNodeUUID,
|
public DiskBalancer(String dataNodeUUID,
|
||||||
Configuration conf, BlockMover blockMover) {
|
Configuration conf, BlockMover blockMover) {
|
||||||
|
this.config = conf;
|
||||||
this.currentResult = Result.NO_PLAN;
|
this.currentResult = Result.NO_PLAN;
|
||||||
this.blockMover = blockMover;
|
this.blockMover = blockMover;
|
||||||
this.dataset = this.blockMover.getDataset();
|
this.dataset = this.blockMover.getDataset();
|
||||||
|
@ -117,6 +120,10 @@ public class DiskBalancer {
|
||||||
this.bandwidth = conf.getInt(
|
this.bandwidth = conf.getInt(
|
||||||
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT,
|
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT,
|
||||||
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT);
|
DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT);
|
||||||
|
this.planValidityInterval = conf.getTimeDuration(
|
||||||
|
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||||
|
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -417,15 +424,17 @@ public class DiskBalancer {
|
||||||
long now = Time.now();
|
long now = Time.now();
|
||||||
long planTime = plan.getTimeStamp();
|
long planTime = plan.getTimeStamp();
|
||||||
|
|
||||||
// TODO : Support Valid Plan hours as a user configurable option.
|
if ((planTime + planValidityInterval) < now) {
|
||||||
if ((planTime +
|
String planValidity = config.get(
|
||||||
(TimeUnit.HOURS.toMillis(
|
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||||
DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) {
|
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT);
|
||||||
String hourString = "Plan was generated more than " +
|
if (planValidity.matches("[0-9]$")) {
|
||||||
Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
|
planValidity += "ms";
|
||||||
+ " hours ago.";
|
}
|
||||||
LOG.error("Disk Balancer - " + hourString);
|
String errorString = "Plan was generated more than " + planValidity
|
||||||
throw new DiskBalancerException(hourString,
|
+ " ago";
|
||||||
|
LOG.error("Disk Balancer - " + errorString);
|
||||||
|
throw new DiskBalancerException(errorString,
|
||||||
DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
|
DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,11 +33,6 @@ public final class DiskBalancerConstants {
|
||||||
public static final int DISKBALANCER_MIN_VERSION = 1;
|
public static final int DISKBALANCER_MIN_VERSION = 1;
|
||||||
public static final int DISKBALANCER_MAX_VERSION = 1;
|
public static final int DISKBALANCER_MAX_VERSION = 1;
|
||||||
|
|
||||||
/**
|
|
||||||
* We treat a plan as stale if it was generated before the hours
|
|
||||||
* defined by the constant below. Defaults to 24 hours.
|
|
||||||
*/
|
|
||||||
public static final int DISKBALANCER_VALID_PLAN_HOURS = 24;
|
|
||||||
// never constructed.
|
// never constructed.
|
||||||
private DiskBalancerConstants() {
|
private DiskBalancerConstants() {
|
||||||
}
|
}
|
||||||
|
|
|
@ -4630,6 +4630,17 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.disk.balancer.plan.valid.interval</name>
|
||||||
|
<value>1d</value>
|
||||||
|
<description>
|
||||||
|
Maximum number of hours the disk balancer plan is valid.
|
||||||
|
This setting supports multiple time unit suffixes as described
|
||||||
|
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
|
||||||
|
is assumed.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.disk.balancer.enabled</name>
|
<name>dfs.disk.balancer.enabled</name>
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
||||||
|
|
||||||
|
|
||||||
|
import static java.lang.Thread.sleep;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.EXECUTE;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.EXECUTE;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP;
|
||||||
|
@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode
|
||||||
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
|
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
@ -187,6 +189,132 @@ public class TestDiskBalancerCommand {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testDiskBalancerExecuteOptionPlanValidityWithException() throws
|
||||||
|
Exception {
|
||||||
|
final int numDatanodes = 1;
|
||||||
|
|
||||||
|
final Configuration hdfsConf = new HdfsConfiguration();
|
||||||
|
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0d");
|
||||||
|
|
||||||
|
/* new cluster with imbalanced capacity */
|
||||||
|
final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
|
||||||
|
newImbalancedCluster(
|
||||||
|
hdfsConf,
|
||||||
|
numDatanodes,
|
||||||
|
CAPACITIES,
|
||||||
|
DEFAULT_BLOCK_SIZE,
|
||||||
|
FILE_LEN);
|
||||||
|
|
||||||
|
try {
|
||||||
|
/* get full path of plan */
|
||||||
|
final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
|
||||||
|
|
||||||
|
/* run execute command */
|
||||||
|
final String cmdLine = String.format(
|
||||||
|
"hdfs diskbalancer -%s %s",
|
||||||
|
EXECUTE,
|
||||||
|
planFileFullName);
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
RemoteException.class,
|
||||||
|
"DiskBalancerException",
|
||||||
|
"Plan was generated more than 0d ago",
|
||||||
|
() -> {
|
||||||
|
runCommand(cmdLine, hdfsConf, miniCluster);
|
||||||
|
});
|
||||||
|
} finally{
|
||||||
|
if (miniCluster != null) {
|
||||||
|
miniCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testDiskBalancerExecutePlanValidityWithOutUnitException()
|
||||||
|
throws
|
||||||
|
Exception {
|
||||||
|
final int numDatanodes = 1;
|
||||||
|
|
||||||
|
final Configuration hdfsConf = new HdfsConfiguration();
|
||||||
|
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0");
|
||||||
|
|
||||||
|
/* new cluster with imbalanced capacity */
|
||||||
|
final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
|
||||||
|
newImbalancedCluster(
|
||||||
|
hdfsConf,
|
||||||
|
numDatanodes,
|
||||||
|
CAPACITIES,
|
||||||
|
DEFAULT_BLOCK_SIZE,
|
||||||
|
FILE_LEN);
|
||||||
|
|
||||||
|
try {
|
||||||
|
/* get full path of plan */
|
||||||
|
final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
|
||||||
|
|
||||||
|
/* run execute command */
|
||||||
|
final String cmdLine = String.format(
|
||||||
|
"hdfs diskbalancer -%s %s",
|
||||||
|
EXECUTE,
|
||||||
|
planFileFullName);
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(
|
||||||
|
RemoteException.class,
|
||||||
|
"DiskBalancerException",
|
||||||
|
"Plan was generated more than 0ms ago",
|
||||||
|
() -> {
|
||||||
|
runCommand(cmdLine, hdfsConf, miniCluster);
|
||||||
|
});
|
||||||
|
} finally{
|
||||||
|
if (miniCluster != null) {
|
||||||
|
miniCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testDiskBalancerExecuteOptionPlanValidity() throws Exception {
|
||||||
|
final int numDatanodes = 1;
|
||||||
|
|
||||||
|
final Configuration hdfsConf = new HdfsConfiguration();
|
||||||
|
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "600s");
|
||||||
|
|
||||||
|
/* new cluster with imbalanced capacity */
|
||||||
|
final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
|
||||||
|
newImbalancedCluster(
|
||||||
|
hdfsConf,
|
||||||
|
numDatanodes,
|
||||||
|
CAPACITIES,
|
||||||
|
DEFAULT_BLOCK_SIZE,
|
||||||
|
FILE_LEN);
|
||||||
|
|
||||||
|
try {
|
||||||
|
/* get full path of plan */
|
||||||
|
final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);
|
||||||
|
|
||||||
|
/* run execute command */
|
||||||
|
final String cmdLine = String.format(
|
||||||
|
"hdfs diskbalancer -%s %s",
|
||||||
|
EXECUTE,
|
||||||
|
planFileFullName);
|
||||||
|
|
||||||
|
// Plan is valid for 600 seconds, sleeping for 10seconds, so now
|
||||||
|
// diskbalancer should execute the plan
|
||||||
|
sleep(10000);
|
||||||
|
runCommand(cmdLine, hdfsConf, miniCluster);
|
||||||
|
} finally{
|
||||||
|
if (miniCluster != null) {
|
||||||
|
miniCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String runAndVerifyPlan(
|
private String runAndVerifyPlan(
|
||||||
final MiniDFSCluster miniCluster,
|
final MiniDFSCluster miniCluster,
|
||||||
final Configuration hdfsConf) throws Exception {
|
final Configuration hdfsConf) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue