HDFS-9703. DiskBalancer: getBandwidth implementation. (Contributed by Anu Engineer)
This commit is contained in:
parent
918722bdd2
commit
75a711a2d5
|
@ -1031,7 +1031,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
* @param data - FSDataSet
|
* @param data - FSDataSet
|
||||||
* @param conf - Config
|
* @param conf - Config
|
||||||
*/
|
*/
|
||||||
private synchronized void initDiskBalancer(FsDatasetSpi data,
|
private void initDiskBalancer(FsDatasetSpi data,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
if (this.diskBalancer != null) {
|
if (this.diskBalancer != null) {
|
||||||
return;
|
return;
|
||||||
|
@ -1045,7 +1045,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
/**
|
/**
|
||||||
* Shutdown disk balancer.
|
* Shutdown disk balancer.
|
||||||
*/
|
*/
|
||||||
private synchronized void shutdownDiskBalancer() {
|
private void shutdownDiskBalancer() {
|
||||||
if (this.diskBalancer != null) {
|
if (this.diskBalancer != null) {
|
||||||
this.diskBalancer.shutdown();
|
this.diskBalancer.shutdown();
|
||||||
this.diskBalancer = null;
|
this.diskBalancer = null;
|
||||||
|
@ -3375,6 +3375,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
switch (key) {
|
switch (key) {
|
||||||
case DiskBalancerConstants.DISKBALANCER_VOLUME_NAME:
|
case DiskBalancerConstants.DISKBALANCER_VOLUME_NAME:
|
||||||
return this.diskBalancer.getVolumeNames();
|
return this.diskBalancer.getVolumeNames();
|
||||||
|
case DiskBalancerConstants.DISKBALANCER_BANDWIDTH :
|
||||||
|
return Long.toString(this.diskBalancer.getBandwidth());
|
||||||
default:
|
default:
|
||||||
LOG.error("Disk Balancer - Unknown key in get balancer setting. Key: " +
|
LOG.error("Disk Balancer - Unknown key in get balancer setting. Key: " +
|
||||||
key);
|
key);
|
||||||
|
|
|
@ -73,6 +73,7 @@ public class DiskBalancer {
|
||||||
private Future future;
|
private Future future;
|
||||||
private String planID;
|
private String planID;
|
||||||
private DiskBalancerWorkStatus.Result currentResult;
|
private DiskBalancerWorkStatus.Result currentResult;
|
||||||
|
private long bandwidth;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a Disk Balancer object. This object takes care of reading a
|
* Constructs a Disk Balancer object. This object takes care of reading a
|
||||||
|
@ -159,6 +160,7 @@ public class DiskBalancer {
|
||||||
createWorkPlan(nodePlan);
|
createWorkPlan(nodePlan);
|
||||||
this.planID = planID;
|
this.planID = planID;
|
||||||
this.currentResult = Result.PLAN_UNDER_PROGRESS;
|
this.currentResult = Result.PLAN_UNDER_PROGRESS;
|
||||||
|
this.bandwidth = bandwidth;
|
||||||
executePlan();
|
executePlan();
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
@ -248,6 +250,21 @@ public class DiskBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current bandwidth.
|
||||||
|
*
|
||||||
|
* @return string representation of bandwidth.
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
public long getBandwidth() throws DiskBalancerException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
checkDiskBalancerEnabled();
|
||||||
|
return this.bandwidth;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throws if Disk balancer is disabled.
|
* Throws if Disk balancer is disabled.
|
||||||
|
|
|
@ -187,6 +187,22 @@ public class TestDiskBalancerRPC {
|
||||||
dataNode.getDiskBalancerSetting(invalidSetting);
|
dataNode.getDiskBalancerSetting(invalidSetting);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testgetDiskBalancerBandwidth() throws Exception {
|
||||||
|
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
|
||||||
|
DataNode dataNode = rpcTestHelper.getDataNode();
|
||||||
|
String planHash = rpcTestHelper.getPlanHash();
|
||||||
|
int planVersion = rpcTestHelper.getPlanVersion();
|
||||||
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
|
|
||||||
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
|
String bandwidthString = dataNode.getDiskBalancerSetting(
|
||||||
|
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
|
||||||
|
long value = Long.decode(bandwidthString);
|
||||||
|
Assert.assertEquals(10L, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueryPlan() throws Exception {
|
public void testQueryPlan() throws Exception {
|
||||||
|
@ -211,16 +227,6 @@ public class TestDiskBalancerRPC {
|
||||||
Assert.assertTrue(status.getResult() == NO_PLAN);
|
Assert.assertTrue(status.getResult() == NO_PLAN);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetDiskBalancerSetting() throws Exception {
|
|
||||||
final int dnIndex = 0;
|
|
||||||
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
|
||||||
thrown.expect(DiskBalancerException.class);
|
|
||||||
thrown.expect(new
|
|
||||||
ResultVerifier(Result.UNKNOWN_KEY));
|
|
||||||
dataNode.getDiskBalancerSetting(
|
|
||||||
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
|
|
||||||
}
|
|
||||||
|
|
||||||
private class RpcTestHelper {
|
private class RpcTestHelper {
|
||||||
private NodePlan plan;
|
private NodePlan plan;
|
||||||
|
|
Loading…
Reference in New Issue