HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Contributed by Anu Engineer)

This commit is contained in:
Arpit Agarwal 2016-03-03 17:00:52 -08:00
parent e646c2eb50
commit 9847640603
5 changed files with 172 additions and 78 deletions

View File

@ -30,3 +30,6 @@ HDFS-1312 Change Log
HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via
Arpit Agarwal)
HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Anu Engineer via
Arpit Agarwal)

View File

@ -3337,12 +3337,15 @@ public class DataNode extends ReconfigurableBase
this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false);
}
/**
* Cancels a running plan.
* @param planID - Hash string that identifies a plan
*/
@Override
public void cancelDiskBalancePlan(String planID) throws
IOException {
checkSuperuserPrivilege();
throw new DiskBalancerException("Not Implemented",
DiskBalancerException.Result.INTERNAL_ERROR);
this.diskBalancer.cancelPlan(planID);
}
/**

View File

@ -196,6 +196,32 @@ public class DiskBalancer {
}
}
/**
* Cancels a running plan.
* @param planID - Hash of the plan to cancel.
* @throws DiskBalancerException
*/
public void cancelPlan(String planID) throws DiskBalancerException {
lock.lock();
try {
checkDiskBalancerEnabled();
if ((this.planID == null) || (!this.planID.equals(planID))) {
LOG.error("Disk Balancer - No such plan. Cancel plan failed. PlanID: " +
planID);
throw new DiskBalancerException("No such plan.",
DiskBalancerException.Result.NO_SUCH_PLAN);
}
if (!this.future.isDone()) {
this.blockMover.setExitFlag();
shutdownExecutor();
}
} finally {
lock.unlock();
}
}
/**
* Throws if Disk balancer is disabled.
*

View File

@ -35,7 +35,8 @@ public class DiskBalancerException extends IOException {
PLAN_ALREADY_IN_PROGRESS,
INVALID_VOLUME,
INVALID_MOVE,
INTERNAL_ERROR
INTERNAL_ERROR,
NO_SUCH_PLAN
}
private final Result result;

View File

@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -37,6 +37,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
@ -63,103 +64,163 @@ public class TestDiskBalancerRPC {
}
@Test
public void testSubmitTestRpc() throws Exception {
final int dnIndex = 0;
cluster.restartDataNode(dnIndex);
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
dataNode.getDatanodeUuid());
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
final int planVersion = 1; // So far we support only one version.
String planHash = DigestUtils.sha512Hex(plan.toJson());
public void testSubmitPlan() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
}
@Test
public void testCancelTestRpc() throws Exception {
final int dnIndex = 0;
cluster.restartDataNode(dnIndex);
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
public void testSubmitPlanWithInvalidHash() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
char hashArray[] = planHash.toCharArray();
hashArray[0]++;
planHash = String.valueOf(hashArray);
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
}
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
@Test
public void testSubmitPlanWithInvalidVersion() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
int planVersion = rpcTestHelper.getPlanVersion();
planVersion++;
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
}
final int planVersion = 0; // So far we support only one version.
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
String planHash = DigestUtils.sha512Hex(plan.toJson());
@Test
public void testSubmitPlanWithInvalidPlan() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
}
// Since submitDiskBalancerPlan is not implemented yet, it throws an
// Exception, this will be modified with the actual implementation.
try {
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
} catch (DiskBalancerException ex) {
// Let us ignore this for time being.
}
@Test
public void testCancelPlan() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
dataNode.cancelDiskBalancePlan(planHash);
}
@Test
public void testCancelNonExistentPlan() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
char hashArray[] = planHash.toCharArray();
hashArray[0]++;
planHash = String.valueOf(hashArray);
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
dataNode.cancelDiskBalancePlan(planHash);
}
@Test
public void testQueryTestRpc() throws Exception {
final int dnIndex = 0;
cluster.restartDataNode(dnIndex);
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
public void testCancelEmptyPlan() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = "";
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
dataNode.cancelDiskBalancePlan(planHash);
}
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster
(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
dataNode.getDatanodeUuid());
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
final int planVersion = 1; // So far we support only one version.
String planHash = DigestUtils.sha512Hex(plan.toJson());
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
@Test
public void testQueryPlan() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
status.getResult() == PLAN_DONE);
}
@Test
public void testgetDiskBalancerSetting() throws Exception {
public void testQueryPlanWithoutSubmit() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
Assert.assertTrue(status.getResult() == NO_PLAN);
}
@Test
public void testGetDiskBalancerSetting() throws Exception {
final int dnIndex = 0;
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
thrown.expect(DiskBalancerException.class);
dataNode.getDiskBalancerSetting(
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
}
private class RpcTestHelper {
private NodePlan plan;
private int planVersion;
private DataNode dataNode;
private String planHash;
public NodePlan getPlan() {
return plan;
}
public int getPlanVersion() {
return planVersion;
}
public DataNode getDataNode() {
return dataNode;
}
public String getPlanHash() {
return planHash;
}
public RpcTestHelper invoke() throws Exception {
final int dnIndex = 0;
cluster.restartDataNode(dnIndex);
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
dataNode = cluster.getDataNodes().get(dnIndex);
DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
dataNode.getDatanodeUuid());
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
planVersion = 1;
planHash = DigestUtils.sha512Hex(plan.toJson());
return this;
}
}
}