HDFS-9595. DiskBalancer: Add cancelPlan RPC. (Contributed by Anu Engineer)

This commit is contained in:
Arpit Agarwal 2016-01-15 16:08:49 -08:00
parent 7100c0da35
commit 0501d430e2
8 changed files with 127 additions and 10 deletions

View File

@ -170,4 +170,11 @@ public interface ClientDatanodeProtocol {
void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth,
String plan) throws IOException;
/**
* Cancel an executing plan.
*
* @param planID - A SHA512 hash of the plan string.
*/
void cancelDiskBalancePlan(String planID) throws IOException;
}

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Shutdo
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanRequestProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -358,4 +359,22 @@ public class ClientDatanodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
/**
* Cancels an executing disk balancer plan.
* @param planID - A SHA512 hash of the plan string.
*
* @throws IOException on error
*/
@Override
public void cancelDiskBalancePlan(String planID)
throws IOException {
try {
CancelPlanRequestProto request = CancelPlanRequestProto.newBuilder()
.setPlanID(planID).build();
rpcProxy.cancelDiskBalancerPlan(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -166,6 +166,20 @@ message SubmitDiskBalancerPlanRequestProto {
message SubmitDiskBalancerPlanResponseProto {
}
/**
* This message describes a request to cancel an
* outstanding disk balancer plan
*/
message CancelPlanRequestProto {
required string planID = 1;
}
/**
* This is the response for the cancellation request
*/
message CancelPlanResponseProto {
}
/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
@ -230,4 +244,9 @@ service ClientDatanodeProtocolService {
*/
rpc submitDiskBalancerPlan(SubmitDiskBalancerPlanRequestProto)
returns (SubmitDiskBalancerPlanResponseProto);
/**
* Cancel an executing plan
*/
rpc cancelDiskBalancerPlan(CancelPlanRequestProto)
returns (CancelPlanResponseProto);
}

View File

@ -16,3 +16,6 @@ HDFS-1312 Change Log
HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via
Arpit Agarwal)
HDFS-9595. DiskBalancer: Add cancelPlan RPC. (Anu Engineer via
Arpit Agarwal)

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Trigge
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.CancelPlanResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -259,4 +261,24 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
/**
* Cancel an executing plan.
* @param controller - RpcController
* @param request - Request
* @return Response.
* @throws ServiceException
*/
@Override
public CancelPlanResponseProto cancelDiskBalancerPlan(
RpcController controller, CancelPlanRequestProto request)
throws ServiceException {
try {
impl.cancelDiskBalancePlan(request.getPlanID());
return CancelPlanResponseProto.newBuilder().build();
}catch (Exception e) {
throw new ServiceException(e);
}
}
}

View File

@ -3310,4 +3310,11 @@ public class DataNode extends ReconfigurableBase
throw new DiskbalancerException("Not Implemented", 0);
}
@Override
public void cancelDiskBalancePlan(String planID) throws
IOException {
checkSuperuserPrivilege();
throw new DiskbalancerException("Not Implemented", 0);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hdfs.server.diskbalancer.planner;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
@ -90,6 +91,9 @@ public class GreedyPlanner implements Planner {
public void balanceVolumeSet(DiskBalancerDataNode node,
DiskBalancerVolumeSet vSet, NodePlan plan)
throws Exception {
Preconditions.checkNotNull(vSet);
Preconditions.checkNotNull(plan);
Preconditions.checkNotNull(node);
DiskBalancerVolumeSet currentSet = new DiskBalancerVolumeSet(vSet);
while (currentSet.isBalancingNeeded(this.threshold)) {

View File

@ -42,10 +42,10 @@ public class TestDiskBalancerRPC {
public ExpectedException thrown = ExpectedException.none();
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setUp() throws Exception {
Configuration conf = new HdfsConfiguration();
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
}
@ -59,21 +59,54 @@ public class TestDiskBalancerRPC {
@Test
public void TestSubmitTestRpc() throws Exception {
URI clusterJson = getClass()
.getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI();
ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
null);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
final int dnIndex = 0;
cluster.restartDataNode(dnIndex);
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(3, diskBalancerCluster.getNodes().size());
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(dnIndex);
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
final int planVersion = 0; // So far we support only one version.
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
String planHash = DigestUtils.sha512Hex(plan.toJson());
// Since submitDiskBalancerPlan is not implemented yet, it throws an
// Exception, this will be modified with the actual implementation.
thrown.expect(DiskbalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
}
@Test
public void TestCancelTestRpc() throws Exception {
final int dnIndex = 0;
cluster.restartDataNode(dnIndex);
cluster.waitActive();
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
final int dnIndex = 0;
final int planVersion = 0; // So far we support only one version.
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
String planHash = DigestUtils.sha512Hex(plan.toJson());
@ -83,5 +116,8 @@ public class TestDiskBalancerRPC {
thrown.expect(DiskbalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
thrown.expect(DiskbalancerException.class);
dataNode.cancelDiskBalancePlan(planHash);
}
}