diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt index 07403cf6bf0..919d73e6b72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -30,3 +30,6 @@ HDFS-1312 Change Log HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via Arpit Agarwal) + HDFS-9683. DiskBalancer: Add cancelPlan implementation. (Anu Engineer via + Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 56585a88a4d..126deb4be29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3337,12 +3337,15 @@ public class DataNode extends ReconfigurableBase this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false); } + /** + * Cancels a running plan. + * @param planID - Hash string that identifies a plan + */ @Override public void cancelDiskBalancePlan(String planID) throws IOException { checkSuperuserPrivilege(); - throw new DiskBalancerException("Not Implemented", - DiskBalancerException.Result.INTERNAL_ERROR); + this.diskBalancer.cancelPlan(planID); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index c01fb4e8e82..81dbb2d2840 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -196,6 +196,32 @@ public class DiskBalancer { } } + /** + * Cancels a running plan. + * @param planID - Hash of the plan to cancel. + * @throws DiskBalancerException + */ + public void cancelPlan(String planID) throws DiskBalancerException { + lock.lock(); + try { + checkDiskBalancerEnabled(); + if ((this.planID == null) || (!this.planID.equals(planID))) { + LOG.error("Disk Balancer - No such plan. Cancel plan failed. PlanID: " + + planID); + throw new DiskBalancerException("No such plan.", + DiskBalancerException.Result.NO_SUCH_PLAN); + } + if (!this.future.isDone()) { + this.blockMover.setExitFlag(); + shutdownExecutor(); + } + } finally { + lock.unlock(); + } + } + + + /** * Throws if Disk balancer is disabled. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java index a5e158151a9..00fe53d9c6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java @@ -35,7 +35,8 @@ public class DiskBalancerException extends IOException { PLAN_ALREADY_IN_PROGRESS, INVALID_VOLUME, INVALID_MOVE, - INTERNAL_ERROR + INTERNAL_ERROR, + NO_SUCH_PLAN } private final Result result; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index 974e9738ffe..e29b3b792bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *
+ * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -37,6 +37,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS; @@ -63,103 +64,163 @@ public class TestDiskBalancerRPC { } @Test - public void testSubmitTestRpc() throws Exception { - final int dnIndex = 0; - cluster.restartDataNode(dnIndex); - cluster.waitActive(); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); - - DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - Assert.assertEquals(cluster.getDataNodes().size(), - diskBalancerCluster.getNodes().size()); - diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - - DataNode dataNode = cluster.getDataNodes().get(dnIndex); - DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( - dataNode.getDatanodeUuid()); - GreedyPlanner planner = new GreedyPlanner(10.0f, node); - NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort - ()); - planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); - final int planVersion = 1; // So far we support only one version. - - String planHash = DigestUtils.sha512Hex(plan.toJson()); - + public void testSubmitPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); } @Test - public void testCancelTestRpc() throws Exception { - final int dnIndex = 0; - cluster.restartDataNode(dnIndex); - cluster.waitActive(); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + public void testSubmitPlanWithInvalidHash() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + char hashArray[] = planHash.toCharArray(); + hashArray[0]++; + planHash = String.valueOf(hashArray); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + } - DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - Assert.assertEquals(cluster.getDataNodes().size(), - diskBalancerCluster.getNodes().size()); - diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0); - GreedyPlanner planner = new GreedyPlanner(10.0f, node); - NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort - ()); - planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); + @Test + public void testSubmitPlanWithInvalidVersion() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + planVersion++; + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + } - final int planVersion = 0; // So far we support only one version. - DataNode dataNode = cluster.getDataNodes().get(dnIndex); - String planHash = DigestUtils.sha512Hex(plan.toJson()); + @Test + public void testSubmitPlanWithInvalidPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, ""); + } - // Since submitDiskBalancerPlan is not implemented yet, it throws an - // Exception, this will be modified with the actual implementation. - try { - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); - } catch (DiskBalancerException ex) { - // Let us ignore this for time being. - } + @Test + public void testCancelPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + dataNode.cancelDiskBalancePlan(planHash); + } + + @Test + public void testCancelNonExistentPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + char hashArray[] = planHash.toCharArray(); + hashArray[0]++; + planHash = String.valueOf(hashArray); + NodePlan plan = rpcTestHelper.getPlan(); thrown.expect(DiskBalancerException.class); dataNode.cancelDiskBalancePlan(planHash); } @Test - public void testQueryTestRpc() throws Exception { - final int dnIndex = 0; - cluster.restartDataNode(dnIndex); - cluster.waitActive(); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + public void testCancelEmptyPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = ""; + NodePlan plan = rpcTestHelper.getPlan(); + thrown.expect(DiskBalancerException.class); + dataNode.cancelDiskBalancePlan(planHash); + } - DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster - (nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - Assert.assertEquals(cluster.getDataNodes().size(), - diskBalancerCluster.getNodes().size()); - diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); - DataNode dataNode = cluster.getDataNodes().get(dnIndex); - DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( - dataNode.getDatanodeUuid()); - GreedyPlanner planner = new GreedyPlanner(10.0f, node); - NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort - ()); - planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); - final int planVersion = 1; // So far we support only one version. - String planHash = DigestUtils.sha512Hex(plan.toJson()); - dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + @Test + public void testQueryPlan() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + String planHash = rpcTestHelper.getPlanHash(); + int planVersion = rpcTestHelper.getPlanVersion(); + NodePlan plan = rpcTestHelper.getPlan(); + + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan(); Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS || status.getResult() == PLAN_DONE); } @Test - public void testgetDiskBalancerSetting() throws Exception { + public void testQueryPlanWithoutSubmit() throws Exception { + RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); + DataNode dataNode = rpcTestHelper.getDataNode(); + + DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan(); + Assert.assertTrue(status.getResult() == NO_PLAN); + } + + @Test + public void testGetDiskBalancerSetting() throws Exception { final int dnIndex = 0; DataNode dataNode = cluster.getDataNodes().get(dnIndex); thrown.expect(DiskBalancerException.class); dataNode.getDiskBalancerSetting( DiskBalancerConstants.DISKBALANCER_BANDWIDTH); } + + private class RpcTestHelper { + private NodePlan plan; + private int planVersion; + private DataNode dataNode; + private String planHash; + + public NodePlan getPlan() { + return plan; + } + + public int getPlanVersion() { + return planVersion; + } + + public DataNode getDataNode() { + return dataNode; + } + + public String getPlanHash() { + return planHash; + } + + public RpcTestHelper invoke() throws Exception { + final int dnIndex = 0; + cluster.restartDataNode(dnIndex); + cluster.waitActive(); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + + DiskBalancerCluster diskBalancerCluster = + new DiskBalancerCluster(nameNodeConnector); + diskBalancerCluster.readClusterInfo(); + Assert.assertEquals(cluster.getDataNodes().size(), + diskBalancerCluster.getNodes().size()); + diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); + dataNode = cluster.getDataNodes().get(dnIndex); + DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID( + dataNode.getDatanodeUuid()); + GreedyPlanner planner = new GreedyPlanner(10.0f, node); + plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort()); + planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan); + planVersion = 1; + planHash = DigestUtils.sha512Hex(plan.toJson()); + return this; + } + } }