HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Contributed by Anu Engineer)

This commit is contained in:
Arpit Agarwal 2016-02-24 16:49:30 -08:00
parent 2b1b2faf76
commit e646c2eb50
8 changed files with 251 additions and 46 deletions

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
@ -392,10 +393,14 @@ public class ClientDatanodeProtocolTranslatorPB implements
QueryPlanStatusRequestProto.newBuilder().build();
QueryPlanStatusResponseProto response =
rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
return new DiskBalancerWorkStatus(response.hasResult() ?
response.getResult() : 0,
DiskBalancerWorkStatus.Result result = Result.NO_PLAN;
if(response.hasResult()) {
result = DiskBalancerWorkStatus.Result.values()[
response.getResult()];
}
return new DiskBalancerWorkStatus(result,
response.hasPlanID() ? response.getPlanID() : null,
response.hasStatus() ? response.getStatus() : null,
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);

View File

@ -19,8 +19,17 @@
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jackson.map.ObjectMapper;
import static org.codehaus.jackson.map.type.TypeFactory.defaultInstance;
import java.io.IOException;
import java.util.List;
import java.util.LinkedList;
/**
* Helper class that reports how much work has has been done by the node.
@ -28,33 +37,69 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class DiskBalancerWorkStatus {
private final int result;
private final String planID;
private final String status;
private final String currentState;
private final List<DiskBalancerWorkEntry> currentState;
private Result result;
private String planID;
/**
* Constructs a default workStatus Object.
*/
public DiskBalancerWorkStatus() {
this.currentState = new LinkedList<>();
}
/**
* Constructs a workStatus Object.
*
* @param result - int
* @param planID - Plan ID
*/
public DiskBalancerWorkStatus(Result result, String planID) {
this();
this.result = result;
this.planID = planID;
}
/**
* Constructs a workStatus Object.
*
* @param result - int
* @param planID - Plan ID
* @param status - Current Status
* @param currentState - Current State
*/
public DiskBalancerWorkStatus(int result, String planID, String status,
String currentState) {
public DiskBalancerWorkStatus(Result result, String planID,
List<DiskBalancerWorkEntry> currentState) {
this.result = result;
this.planID = planID;
this.status = status;
this.currentState = currentState;
}
/**
* Constructs a workStatus Object.
*
* @param result - int
* @param planID - Plan ID
* @param currentState - List of WorkEntries.
*/
public DiskBalancerWorkStatus(Result result, String planID,
String currentState) throws IOException {
this.result = result;
this.planID = planID;
ObjectMapper mapper = new ObjectMapper();
this.currentState = mapper.readValue(currentState,
defaultInstance().constructCollectionType(
List.class, DiskBalancerWorkEntry.class));
}
/**
* Returns result.
*
* @return long
*/
public int getResult() {
public Result getResult() {
return result;
}
@ -67,21 +112,136 @@ public class DiskBalancerWorkStatus {
return planID;
}
/**
* Returns Status.
*
* @return String
*/
public String getStatus() {
return status;
}
/**
* Gets current Status.
*
* @return - Json String
*/
public String getCurrentState() {
public List<DiskBalancerWorkEntry> getCurrentState() {
return currentState;
}
/**
* Return current state as a string.
*
* @throws IOException
**/
public String getCurrentStateString() throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(currentState);
}
/**
* Adds a new work entry to the list.
*
* @param entry - DiskBalancerWorkEntry
*/
public void addWorkEntry(DiskBalancerWorkEntry entry) {
Preconditions.checkNotNull(entry);
currentState.add(entry);
}
/** Various result values. **/
public enum Result {
NO_PLAN(0),
PLAN_UNDER_PROGRESS(1),
PLAN_DONE(2),
PLAN_CANCELLED(3);
private int result;
private Result(int result) {
this.result = result;
}
/**
* Get int value of result.
*
* @return int
*/
public int getIntResult() {
return result;
}
}
/**
* A class that is used to report each work item that we are working on. This
* class describes the Source, Destination and how much data has been already
* moved, errors encountered etc. This is useful for the disk balancer stats
* as well as the queryStatus RPC.
*/
public static class DiskBalancerWorkEntry {
private String sourcePath;
private String destPath;
private DiskBalancerWorkItem workItem;
/**
* Constructs a Work Entry class.
*
* @param sourcePath - Source Path where we are moving data from.
* @param destPath - Destination path to where we are moving data to.
* @param workItem - Current work status of this move.
*/
public DiskBalancerWorkEntry(String sourcePath, String destPath,
DiskBalancerWorkItem workItem) {
this.sourcePath = sourcePath;
this.destPath = destPath;
this.workItem = workItem;
}
/**
* Returns the source path.
*
* @return - Source path
*/
public String getSourcePath() {
return sourcePath;
}
/**
* Sets the Source Path.
*
* @param sourcePath - Volume Path.
*/
public void setSourcePath(String sourcePath) {
this.sourcePath = sourcePath;
}
/**
* Gets the Destination path.
*
* @return - Path
*/
public String getDestPath() {
return destPath;
}
/**
* Sets the destination path.
*
* @param destPath - Path
*/
public void setDestPath(String destPath) {
this.destPath = destPath;
}
/**
* Gets the current status of work for these volumes.
*
* @return - Work Item
*/
public DiskBalancerWorkItem getWorkItem() {
return workItem;
}
/**
* Sets the work item.
*
* @param workItem - sets the work item information
*/
public void setWorkItem(DiskBalancerWorkItem workItem) {
this.workItem = workItem;
}
}
}

View File

@ -194,9 +194,8 @@ message QueryPlanStatusRequestProto {
*/
message QueryPlanStatusResponseProto {
optional uint32 result = 1;
optional string status = 2;
optional string planID = 3;
optional string currentStatus = 4;
optional string planID = 2;
optional string currentStatus = 3;
}
/**

View File

@ -24,6 +24,9 @@ HDFS-1312 Change Log
HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer via
Arpit Agarwal)
HDFS-9671skBalancer: SubmitPlan implementation. (Anu Engineer via
HDFS-9671. DiskBalancer: SubmitPlan implementation. (Anu Engineer via
Arpit Agarwal)
HDFS-9681. DiskBalancer: Add QueryPlan implementation. (Anu Engineer via
Arpit Agarwal)

View File

@ -296,10 +296,9 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
DiskBalancerWorkStatus result = impl.queryDiskBalancerPlan();
return QueryPlanStatusResponseProto
.newBuilder()
.setResult(result.getResult())
.setResult(result.getResult().getIntResult())
.setPlanID(result.getPlanID())
.setStatus(result.getStatus())
.setCurrentStatus(result.getCurrentState())
.setCurrentStatus(result.getCurrentStateString())
.build();
} catch (Exception e) {
throw new ServiceException(e);

View File

@ -3345,11 +3345,15 @@ public class DataNode extends ReconfigurableBase
DiskBalancerException.Result.INTERNAL_ERROR);
}
/**
* Returns the status of current or last executed work plan.
* @return DiskBalancerWorkStatus.
* @throws IOException
*/
@Override
public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
checkSuperuserPrivilege();
throw new DiskBalancerException("Not Implemented",
DiskBalancerException.Result.INTERNAL_ERROR);
return this.diskBalancer.queryWorkStatus();
}
/**

View File

@ -25,6 +25,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.DiskBalancerWorkEntry;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
@ -68,6 +70,7 @@ public class DiskBalancer {
private ExecutorService scheduler;
private Future future;
private String planID;
private DiskBalancerWorkStatus.Result currentResult;
/**
* Constructs a Disk Balancer object. This object takes care of reading a
@ -79,6 +82,7 @@ public class DiskBalancer {
*/
public DiskBalancer(String dataNodeUUID,
Configuration conf, BlockMover blockMover) {
this.currentResult = Result.NO_PLAN;
this.blockMover = blockMover;
this.dataset = this.blockMover.getDataset();
this.dataNodeUUID = dataNodeUUID;
@ -97,6 +101,7 @@ public class DiskBalancer {
lock.lock();
try {
this.isDiskBalancerEnabled = false;
this.currentResult = Result.NO_PLAN;
if ((this.future != null) && (!this.future.isDone())) {
this.blockMover.setExitFlag();
shutdownExecutor();
@ -151,12 +156,46 @@ public class DiskBalancer {
verifyPlan(planID, planVersion, plan, bandwidth, force);
createWorkPlan(nodePlan);
this.planID = planID;
this.currentResult = Result.PLAN_UNDER_PROGRESS;
executePlan();
} finally {
lock.unlock();
}
}
/**
* Returns the Current Work Status of a submitted Plan.
*
* @return DiskBalancerWorkStatus.
* @throws DiskBalancerException
*/
public DiskBalancerWorkStatus queryWorkStatus() throws DiskBalancerException {
lock.lock();
try {
checkDiskBalancerEnabled();
// if we had a plan in progress, check if it is finished.
if (this.currentResult == Result.PLAN_UNDER_PROGRESS &&
this.future != null &&
this.future.isDone()) {
this.currentResult = Result.PLAN_DONE;
}
DiskBalancerWorkStatus status =
new DiskBalancerWorkStatus(this.currentResult, this.planID);
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {
DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry(
entry.getKey().getSource().getBasePath(),
entry.getKey().getDest().getBasePath(),
entry.getValue());
status.addWorkEntry(workEntry);
}
return status;
} finally {
lock.unlock();
}
}
/**
* Throws if Disk balancer is disabled.
*

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
@ -36,6 +37,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
public class TestDiskBalancerRPC {
@Rule
public ExpectedException thrown = ExpectedException.none();
@ -134,28 +138,20 @@ public class TestDiskBalancerRPC {
Assert.assertEquals(cluster.getDataNodes().size(),
diskBalancerCluster.getNodes().size());
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(0);
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
dataNode.getDatanodeUuid());
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
final int planVersion = 0; // So far we support only one version.
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
final int planVersion = 1; // So far we support only one version.
String planHash = DigestUtils.sha512Hex(plan.toJson());
// Since submitDiskBalancerPlan is not implemented yet, it throws an
// Exception, this will be modified with the actual implementation.
try {
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
} catch (DiskBalancerException ex) {
// Let us ignore this for time being.
}
// TODO : This will be fixed when we have implementation for this
// function in server side.
thrown.expect(DiskBalancerException.class);
dataNode.queryDiskBalancerPlan();
DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
status.getResult() == PLAN_DONE);
}
@Test