HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2016-01-11 20:31:18 -08:00
parent 5724a10316
commit 7100c0da35
8 changed files with 289 additions and 0 deletions

View File

@ -163,4 +163,11 @@ public interface ClientDatanodeProtocol {
* @return balancer bandwidth
*/
long getBalancerBandwidth() throws IOException;
/**
* Submit a disk balancer plan for execution.
*/
void submitDiskBalancerPlan(String planID, long planVersion, long bandwidth,
String plan) throws IOException;
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetRe
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -326,4 +327,35 @@ public class ClientDatanodeProtocolTranslatorPB implements
throw ProtobufHelper.getRemoteException(e);
}
}
/**
* Submits a disk balancer plan to the datanode.
* @param planID - Plan ID is the hash512 string of the plan that is
* submitted. This is used by clients when they want to find
* local copies of these plans.
* @param planVersion - The data format of the plans - for future , not
* used now.
* @param bandwidth - Maximum disk bandwidth to consume, setting this value
* to zero allows datanode to use the value defined in
* configration.
* @param plan - Actual plan.
* @return Success or throws Exception.
* @throws Exception
*/
@Override
public void submitDiskBalancerPlan(String planID, long planVersion,
long bandwidth, String plan) throws IOException {
try {
SubmitDiskBalancerPlanRequestProto request =
SubmitDiskBalancerPlanRequestProto.newBuilder()
.setPlanID(planID)
.setPlanVersion(planVersion)
.setMaxDiskBandwidth(bandwidth)
.setPlan(plan)
.build();
rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -149,6 +149,23 @@ message GetBalancerBandwidthResponseProto {
required uint64 bandwidth = 1;
}
/**
* This message allows a client to submit a disk
* balancer plan to a data node.
*/
message SubmitDiskBalancerPlanRequestProto {
required string planID = 1; // A hash of the plan like SHA512
required string plan = 2; // Json String that describes the plan
optional uint64 planVersion = 3; // Plan version number
optional uint64 maxDiskBandwidth = 4; // optional bandwidth control.
}
/**
* Response from the DataNode on Plan Submit request
*/
message SubmitDiskBalancerPlanResponseProto {
}
/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
@ -207,4 +224,10 @@ service ClientDatanodeProtocolService {
*/
rpc getBalancerBandwidth(GetBalancerBandwidthRequestProto)
returns(GetBalancerBandwidthResponseProto);
/**
* Submit a disk balancer plan for execution
*/
rpc submitDiskBalancerPlan(SubmitDiskBalancerPlanRequestProto)
returns (SubmitDiskBalancerPlanResponseProto);
}

View File

@ -13,3 +13,6 @@ HDFS-1312 Change Log
HDFS-9469. DiskBalancer: Add Planner. (Anu Engineer via Arpit Agarwal)
HDFS-9588. DiskBalancer: Add submitDiskbalancer RPC. (Anu Engineer via
Arpit Agarwal)

View File

@ -47,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.Start
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.SubmitDiskBalancerPlanResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -232,4 +234,29 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
return GetBalancerBandwidthResponseProto.newBuilder()
.setBandwidth(bandwidth).build();
}
/**
* Submit a disk balancer plan for execution.
* @param controller - RpcController
* @param request - Request
* @return Response
* @throws ServiceException
*/
@Override
public SubmitDiskBalancerPlanResponseProto submitDiskBalancerPlan(
RpcController controller, SubmitDiskBalancerPlanRequestProto request)
throws ServiceException {
try {
impl.submitDiskBalancerPlan(request.getPlanID(),
request.hasPlanVersion() ? request.getPlanVersion() : 0,
request.hasMaxDiskBandwidth() ? request.getMaxDiskBandwidth() : 0,
request.getPlan());
SubmitDiskBalancerPlanResponseProto response =
SubmitDiskBalancerPlanResponseProto.newBuilder()
.build();
return response;
} catch(Exception e) {
throw new ServiceException(e);
}
}
}

View File

@ -169,6 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -3286,4 +3287,27 @@ public class DataNode extends ReconfigurableBase
public Tracer getTracer() {
return tracer;
}
/**
* Allows submission of a disk balancer Job.
* @param planID - Hash value of the plan.
* @param planVersion - Plan version, reserved for future use. We have only
* version 1 now.
* @param bandwidth - Max disk bandwidth to use, 0 means use value defined
* in the configration.
* @param plan - Actual plan
* @return success or throws an exception.
* @throws Exception
*/
@Override
public void submitDiskBalancerPlan(String planID,
long planVersion, long bandwidth, String plan) throws IOException {
// TODO : This will be replaced with actual code later.
// Right now throwing DiskbalancerException instead
// NotImplementedException to indicate the eventually disk balancer code
// will throw DiskbalancerException.
throw new DiskbalancerException("Not Implemented", 0);
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer;
import java.io.IOException;
/**
* Disk Balancer Exceptions.
*/
public class DiskbalancerException extends IOException {
private int result;
/**
* Constructs an {@code IOException} with the specified detail message.
*
* @param message The detail message (which is saved for later retrieval by
* the
* {@link #getMessage()} method)
*/
public DiskbalancerException(String message, int result) {
super(message);
this.result = result;
}
/**
* Constructs an {@code IOException} with the specified detail message and
* cause.
* <p/>
* <p> Note that the detail message associated with {@code cause} is
* <i>not</i>
* automatically incorporated into this exception's detail message.
*
* @param message The detail message (which is saved for later retrieval by
* the
* {@link #getMessage()} method)
* @param cause The cause (which is saved for later retrieval by the {@link
* #getCause()} method). (A null value is permitted, and
* indicates that the cause is nonexistent or unknown.)
* @since 1.6
*/
public DiskbalancerException(String message, Throwable cause, int result) {
super(message, cause);
this.result = result;
}
/**
* Constructs an {@code IOException} with the specified cause and a detail
* message of {@code (cause==null ? null : cause.toString())} (which typically
* contains the class and detail message of {@code cause}). This
* constructor is
* useful for IO exceptions that are little more than wrappers for other
* throwables.
*
* @param cause The cause (which is saved for later retrieval by the {@link
* #getCause()} method). (A null value is permitted, and
* indicates
* that the cause is nonexistent or unknown.)
* @since 1.6
*/
public DiskbalancerException(Throwable cause, int result) {
super(cause);
this.result = result;
}
/**
* Returns the result.
* @return int
*/
public int getResult() {
return result;
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.server.diskbalancer;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.net.URI;
public class TestDiskBalancerRPC {
@Rule
public ExpectedException thrown = ExpectedException.none();
private MiniDFSCluster cluster;
@Before
public void setUp() throws Exception {
Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void TestSubmitTestRpc() throws Exception {
URI clusterJson = getClass()
.getResource("/diskBalancer/data-cluster-3node-3disk.json").toURI();
ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
null);
DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
diskBalancerCluster.readClusterInfo();
Assert.assertEquals(3, diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("SSD"), plan);
final int dnIndex = 0;
final int planVersion = 0; // So far we support only one version.
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
String planHash = DigestUtils.sha512Hex(plan.toJson());
// Since submitDiskBalancerPlan is not implemented yet, it throws an
// Exception, this will be modified with the actual implementation.
thrown.expect(DiskbalancerException.class);
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
}
}