diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index e5413884535..6e9cef063d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -163,4 +163,11 @@ public interface ClientDatanodeProtocol { * @return balancer bandwidth */ long getBalancerBandwidth() throws IOException; + + /** + * Submit a disk balancer plan for execution. + */ + void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth, + String plan) throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 6aaa0257d86..da8d962099f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetRe import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -326,4 +327,35 @@ public class ClientDatanodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + /** + * Submits a disk balancer plan to the datanode. + * @param planID - Plan ID is the hash512 string of the plan that is + * submitted. This is used by clients when they want to find + * local copies of these plans. + * @param planVersion - The data format of the plans - for future , not + * used now. + * @param bandwidth - Maximum disk bandwidth to consume, setting this value + * to zero allows datanode to use the value defined in + * configration. + * @param plan - Actual plan. + * @return Success or throws Exception. + * @throws Exception + */ + @Override + public void submitDiskBalancerPlan(String planID, long planVersion, + long bandwidth, String plan) throws IOException { + try { + SubmitDiskBalancerPlanRequestProto request = + SubmitDiskBalancerPlanRequestProto.newBuilder() + .setPlanID(planID) + .setPlanVersion(planVersion) + .setMaxDiskBandwidth(bandwidth) + .setPlan(plan) + .build(); + rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index e135df84fb3..d11979b5e5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -149,6 +149,23 @@ message GetBalancerBandwidthResponseProto { required uint64 bandwidth = 1; } +/** + * This message allows a client to submit a disk + * balancer plan to a data node. + */ +message SubmitDiskBalancerPlanRequestProto { + required string planID = 1; // A hash of the plan like SHA512 + required string plan = 2; // Json String that describes the plan + optional uint64 planVersion = 3; // Plan version number + optional uint64 maxDiskBandwidth = 4; // optional bandwidth control. +} + +/** + * Response from the DataNode on Plan Submit request + */ +message SubmitDiskBalancerPlanResponseProto { +} + /** * Protocol used from client to the Datanode. * See the request and response for details of rpc call. @@ -207,4 +224,10 @@ service ClientDatanodeProtocolService { */ rpc getBalancerBandwidth(GetBalancerBandwidthRequestProto) returns(GetBalancerBandwidthResponseProto); + + /** + * Submit a disk balancer plan for execution + */ + rpc submitDiskBalancerPlan(SubmitDiskBalancerPlanRequestProto) + returns (SubmitDiskBalancerPlanResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt index 940e1b526fb..6d8cde02471 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -13,3 +13,6 @@ HDFS-1312 Change Log HDFS-9469. DiskBalancer: Add Planner. (Anu Engineer via Arpit Agarwal) + HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via + Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index e0401f739f0..824f0500f68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.Start import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanResponseProto; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -232,4 +234,29 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements return GetBalancerBandwidthResponseProto.newBuilder() .setBandwidth(bandwidth).build(); } + + /** + * Submit a disk balancer plan for execution. + * @param controller - RpcController + * @param request - Request + * @return Response + * @throws ServiceException + */ + @Override + public SubmitDiskBalancerPlanResponseProto submitDiskBalancerPlan( + RpcController controller, SubmitDiskBalancerPlanRequestProto request) + throws ServiceException { + try { + impl.submitDiskBalancerPlan(request.getPlanID(), + request.hasPlanVersion() ? request.getPlanVersion() : 0, + request.hasMaxDiskBandwidth() ? request.getMaxDiskBandwidth() : 0, + request.getPlan()); + SubmitDiskBalancerPlanResponseProto response = + SubmitDiskBalancerPlanResponseProto.newBuilder() + .build(); + return response; + } catch(Exception e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a59a59ff74e..e06555f2b8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -169,6 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -3286,4 +3287,27 @@ public class DataNode extends ReconfigurableBase public Tracer getTracer() { return tracer; } + + /** + * Allows submission of a disk balancer Job. + * @param planID - Hash value of the plan. + * @param planVersion - Plan version, reserved for future use. We have only + * version 1 now. + * @param bandwidth - Max disk bandwidth to use, 0 means use value defined + * in the configration. + * @param plan - Actual plan + * @return success or throws an exception. + * @throws Exception + */ + @Override + public void submitDiskBalancerPlan(String planID, + long planVersion, long bandwidth, String plan) throws IOException { + + // TODO : This will be replaced with actual code later. + // Right now throwing DiskbalancerException instead + // NotImplementedException to indicate the eventually disk balancer code + // will throw DiskbalancerException. + throw new DiskbalancerException("Not Implemented", 0); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java new file mode 100644 index 00000000000..9d47dc33c8d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskbalancerException.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.diskbalancer; + +import java.io.IOException; + +/** + * Disk Balancer Exceptions. + */ +public class DiskbalancerException extends IOException { + private int result; + + /** + * Constructs an {@code IOException} with the specified detail message. + * + * @param message The detail message (which is saved for later retrieval by + * the + * {@link #getMessage()} method) + */ + public DiskbalancerException(String message, int result) { + super(message); + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified detail message and + * cause. + * + *Note that the detail message associated with {@code cause} is + * not + * automatically incorporated into this exception's detail message. + * + * @param message The detail message (which is saved for later retrieval by + * the + * {@link #getMessage()} method) + * @param cause The cause (which is saved for later retrieval by the {@link + * #getCause()} method). (A null value is permitted, and + * indicates that the cause is nonexistent or unknown.) + * @since 1.6 + */ + public DiskbalancerException(String message, Throwable cause, int result) { + super(message, cause); + this.result = result; + } + + /** + * Constructs an {@code IOException} with the specified cause and a detail + * message of {@code (cause==null ? null : cause.toString())} (which typically + * contains the class and detail message of {@code cause}). This + * constructor is + * useful for IO exceptions that are little more than wrappers for other + * throwables. + * + * @param cause The cause (which is saved for later retrieval by the {@link + * #getCause()} method). (A null value is permitted, and + * indicates + * that the cause is nonexistent or unknown.) + * @since 1.6 + */ + public DiskbalancerException(Throwable cause, int result) { + super(cause); + this.result = result; + } + + /** + * Returns the result. + * @return int + */ + public int getResult() { + return result; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java new file mode 100644 index 00000000000..e047d5a513c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.net.URI; + +public class TestDiskBalancerRPC { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private MiniDFSCluster cluster; + + @Before + public void setUp() throws Exception { + Configuration conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void TestSubmitTestRpc() throws Exception { + URI clusterJson = getClass() + .getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI(); + ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson, + null); + DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(jsonConnector); + diskBalancerCluster.readClusterInfo(); + Assert.assertEquals(3, diskBalancerCluster.getNodes().size()); + diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes()); + DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0); + GreedyPlanner planner = new GreedyPlanner(10.0f, node); + NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort + ()); + planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan); + + final int dnIndex = 0; + final int planVersion = 0; // So far we support only one version. + DataNode dataNode = cluster.getDataNodes().get(dnIndex); + String planHash = DigestUtils.sha512Hex(plan.toJson()); + + // Since submitDiskBalancerPlan is not implemented yet, it throws an + // Exception, this will be modified with the actual implementation. + thrown.expect(DiskbalancerException.class); + dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson()); + + } +}