HDFS-10681. DiskBalancer: query command should report Plan file path apart from PlanID. (Manoj Govindassamy via lei)

This commit is contained in:
Lei Xu 2016-08-09 15:59:22 -07:00
parent cc48251bfd
commit 9c6a4383ca
12 changed files with 88 additions and 51 deletions

View File

@ -168,8 +168,9 @@ public interface ClientDatanodeProtocol {
/**
* Submit a disk balancer plan for execution.
*/
void submitDiskBalancerPlan(String planID, long planVersion, String plan,
boolean skipDateCheck) throws IOException;
void submitDiskBalancerPlan(String planID, long planVersion, String planFile,
String planData, boolean skipDateCheck)
throws IOException;
/**
* Cancel an executing plan.

View File

@ -342,19 +342,22 @@ public class ClientDatanodeProtocolTranslatorPB implements
* local copies of these plans.
* @param planVersion - The data format of the plans - for future , not
* used now.
* @param plan - Actual plan.
* @param planFile - Plan file name
* @param planData - Actual plan data in json format
* @param skipDateCheck - Skips the date check.
* @throws IOException
*/
@Override
public void submitDiskBalancerPlan(String planID, long planVersion,
String plan, boolean skipDateCheck) throws IOException {
String planFile, String planData, boolean skipDateCheck)
throws IOException {
try {
SubmitDiskBalancerPlanRequestProto request =
SubmitDiskBalancerPlanRequestProto.newBuilder()
.setPlanID(planID)
.setPlanVersion(planVersion)
.setPlan(plan)
.setPlanFile(planFile)
.setPlan(planData)
.setIgnoreDateCheck(skipDateCheck)
.build();
rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request);
@ -399,6 +402,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
return new DiskBalancerWorkStatus(result,
response.hasPlanID() ? response.getPlanID() : null,
response.hasPlanFile() ? response.getPlanFile() : null,
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);

View File

@ -42,6 +42,7 @@ public class DiskBalancerWorkStatus {
private final List<DiskBalancerWorkEntry> currentState;
private Result result;
private String planID;
private String planFile;
/**
* Constructs a default workStatus Object.
@ -55,11 +56,13 @@ public class DiskBalancerWorkStatus {
*
* @param result - int
* @param planID - Plan ID
* @param planFile - Plan file name
*/
public DiskBalancerWorkStatus(Result result, String planID) {
public DiskBalancerWorkStatus(Result result, String planID, String planFile) {
this();
this.result = result;
this.planID = planID;
this.planFile = planFile;
}
/**
@ -84,10 +87,11 @@ public class DiskBalancerWorkStatus {
* @param planID - Plan ID
* @param currentState - List of WorkEntries.
*/
public DiskBalancerWorkStatus(Result result, String planID,
public DiskBalancerWorkStatus(Result result, String planID, String planFile,
String currentState) throws IOException {
this.result = result;
this.planID = planID;
this.planFile = planFile;
ObjectMapper mapper = new ObjectMapper();
this.currentState = mapper.readValue(currentState,
defaultInstance().constructCollectionType(
@ -113,6 +117,15 @@ public class DiskBalancerWorkStatus {
return planID;
}
/**
* Returns planFile.
*
* @return String
*/
public String getPlanFile() {
return planFile;
}
/**
* Gets current Status.
*

View File

@ -154,10 +154,11 @@ message GetBalancerBandwidthResponseProto {
* balancer plan to a data node.
*/
message SubmitDiskBalancerPlanRequestProto {
required string planID = 1; // A hash of the plan like SHA512
required string plan = 2; // Json String that describes the plan
optional uint64 planVersion = 3; // Plan version number
optional bool ignoreDateCheck = 4; // Ignore date checks on this plan.
required string planID = 1; // A hash of the plan like SHA512
required string plan = 2; // Plan file data in Json format
optional uint64 planVersion = 3; // Plan version number
optional bool ignoreDateCheck = 4; // Ignore date checks on this plan.
required string planFile = 5; // Plan file path
}
/**
@ -196,6 +197,7 @@ message QueryPlanStatusResponseProto {
optional uint32 result = 1;
optional string planID = 2;
optional string currentStatus = 3;
optional string planFile = 4;
}
/**

View File

@ -255,6 +255,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
try {
impl.submitDiskBalancerPlan(request.getPlanID(),
request.hasPlanVersion() ? request.getPlanVersion() : 1,
request.hasPlanFile() ? request.getPlanFile() : "",
request.getPlan(),
request.hasIgnoreDateCheck() ? request.getIgnoreDateCheck() : false);
SubmitDiskBalancerPlanResponseProto response =
@ -298,6 +299,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
.newBuilder()
.setResult(result.getResult().getIntResult())
.setPlanID(result.getPlanID())
.setPlanFile(result.getPlanFile())
.setCurrentStatus(result.currentStateString())
.build();
} catch (Exception e) {

View File

@ -3399,16 +3399,18 @@ public class DataNode extends ReconfigurableBase
* @param planID - Hash value of the plan.
* @param planVersion - Plan version, reserved for future use. We have only
* version 1 now.
* @param plan - Actual plan
* @param planFile - Plan file name
* @param planData - Actual plan data in json format
* @throws IOException
*/
@Override
public void submitDiskBalancerPlan(String planID,
long planVersion, String plan, boolean skipDateCheck) throws IOException {
public void submitDiskBalancerPlan(String planID, long planVersion,
String planFile, String planData, boolean skipDateCheck)
throws IOException {
checkSuperuserPrivilege();
// TODO : Support force option
this.diskBalancer.submitPlan(planID, planVersion, plan, skipDateCheck);
this.diskBalancer.submitPlan(planID, planVersion, planFile, planData,
skipDateCheck);
}
/**

View File

@ -85,6 +85,7 @@ public class DiskBalancer {
private ExecutorService scheduler;
private Future future;
private String planID;
private String planFile;
private DiskBalancerWorkStatus.Result currentResult;
private long bandwidth;
@ -106,6 +107,7 @@ public class DiskBalancer {
lock = new ReentrantLock();
workMap = new ConcurrentHashMap<>();
this.planID = ""; // to keep protobuf happy.
this.planFile = ""; // to keep protobuf happy.
this.isDiskBalancerEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
@ -155,15 +157,16 @@ public class DiskBalancer {
* Takes a client submitted plan and converts into a set of work items that
* can be executed by the blockMover.
*
* @param planID - A SHA512 of the plan string
* @param planId - A SHA512 of the plan string
* @param planVersion - version of the plan string - for future use.
* @param plan - Actual Plan
* @param planFileName - Plan file name
* @param planData - Plan data in json format
* @param force - Skip some validations and execute the plan file.
* @throws DiskBalancerException
*/
public void submitPlan(String planID, long planVersion, String plan,
boolean force) throws DiskBalancerException {
public void submitPlan(String planId, long planVersion, String planFileName,
String planData, boolean force)
throws DiskBalancerException {
lock.lock();
try {
checkDiskBalancerEnabled();
@ -172,9 +175,10 @@ public class DiskBalancer {
throw new DiskBalancerException("Executing another plan",
DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
}
NodePlan nodePlan = verifyPlan(planID, planVersion, plan, force);
NodePlan nodePlan = verifyPlan(planId, planVersion, planData, force);
createWorkPlan(nodePlan);
this.planID = planID;
this.planID = planId;
this.planFile = planFileName;
this.currentResult = Result.PLAN_UNDER_PROGRESS;
executePlan();
} finally {
@ -200,7 +204,8 @@ public class DiskBalancer {
}
DiskBalancerWorkStatus status =
new DiskBalancerWorkStatus(this.currentResult, this.planID);
new DiskBalancerWorkStatus(this.currentResult, this.planID,
this.planFile);
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {
DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry(
@ -485,7 +490,8 @@ public class DiskBalancer {
@Override
public void run() {
Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan ID - " + planID);
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
planFile, planID);
try {
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
workMap.entrySet()) {

View File

@ -68,16 +68,18 @@ public class ExecuteCommand extends Command {
try (FSDataInputStream plan = open(planFile)) {
planData = IOUtils.toString(plan);
}
submitPlan(planData);
submitPlan(planFile, planData);
}
/**
* Submits plan to a given data node.
*
* @param planData - PlanData Json String.
* @param planFile - Plan file name
* @param planData - Plan data in json format
* @throws IOException
*/
private void submitPlan(String planData) throws IOException {
private void submitPlan(final String planFile, final String planData)
throws IOException {
Preconditions.checkNotNull(planData);
NodePlan plan = NodePlan.parseJson(planData);
String dataNodeAddress = plan.getNodeName() + ":" + plan.getPort();
@ -85,8 +87,9 @@ public class ExecuteCommand extends Command {
ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
String planHash = DigestUtils.sha512Hex(planData);
try {
// TODO : Support skipping date check.
dataNode.submitDiskBalancerPlan(planHash, DiskBalancer.PLAN_VERSION,
planData, false); // TODO : Support skipping date check.
planFile, planData, false);
} catch (DiskBalancerException ex) {
LOG.error("Submitting plan on {} failed. Result: {}, Message: {}",
plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
@ -94,8 +97,6 @@ public class ExecuteCommand extends Command {
}
}
/**
* Gets extended help for this command.
*/

View File

@ -74,8 +74,10 @@ public class QueryCommand extends Command {
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
try {
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
System.out.printf("Plan ID: %s %nResult: %s%n", workStatus.getPlanID(),
workStatus.getResult().toString());
System.out.printf("Plan File: %s%nPlan ID: %s%nResult: %s%n",
workStatus.getPlanFile(),
workStatus.getPlanID(),
workStatus.getResult().toString());
if (cmd.hasOption(DiskBalancer.VERBOSE)) {
System.out.printf("%s", workStatus.currentStateString());

View File

@ -61,6 +61,8 @@ import static org.junit.Assert.assertTrue;
*/
public class TestDiskBalancer {
private static final String PLAN_FILE = "/system/current.plan.json";
@Test
public void testDiskBalancerNameNodeConnectivity() throws Exception {
Configuration conf = new HdfsConfiguration();
@ -195,7 +197,7 @@ public class TestDiskBalancer {
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
// Submit the plan and wait till the execution is done.
newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
newDN.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
String jmxString = newDN.getDiskBalancerStatus();
assertNotNull(jmxString);
DiskBalancerWorkStatus status =
@ -307,7 +309,7 @@ public class TestDiskBalancer {
String planJson = plan.toJson();
String planID = DigestUtils.sha512Hex(planJson);
dataNode.submitDiskBalancerPlan(planID, 1, planJson, false);
dataNode.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override

View File

@ -60,6 +60,7 @@ public class TestDiskBalancerRPC {
@Rule
public ExpectedException thrown = ExpectedException.none();
private static final String PLAN_FILE = "/system/current.plan.json";
private MiniDFSCluster cluster;
private Configuration conf;
@ -85,8 +86,8 @@ public class TestDiskBalancerRPC {
String planHash = rpcTestHelper.getPlanHash();
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
false);
dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
plan.toJson(), false);
}
@Test
@ -101,8 +102,8 @@ public class TestDiskBalancerRPC {
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_HASH));
dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
false);
dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
plan.toJson(), false);
}
@Test
@ -115,8 +116,8 @@ public class TestDiskBalancerRPC {
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_VERSION));
dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
false);
dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
plan.toJson(), false);
}
@Test
@ -128,7 +129,7 @@ public class TestDiskBalancerRPC {
NodePlan plan = rpcTestHelper.getPlan();
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN));
dataNode.submitDiskBalancerPlan(planHash, planVersion, "",
dataNode.submitDiskBalancerPlan(planHash, planVersion, "", "",
false);
}
@ -139,8 +140,8 @@ public class TestDiskBalancerRPC {
String planHash = rpcTestHelper.getPlanHash();
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
false);
dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
plan.toJson(), false);
dataNode.cancelDiskBalancePlan(planHash);
}
@ -203,8 +204,8 @@ public class TestDiskBalancerRPC {
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
false);
dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
plan.toJson(), false);
String bandwidthString = dataNode.getDiskBalancerSetting(
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
long value = Long.decode(bandwidthString);
@ -219,8 +220,8 @@ public class TestDiskBalancerRPC {
int planVersion = rpcTestHelper.getPlanVersion();
NodePlan plan = rpcTestHelper.getPlan();
dataNode.submitDiskBalancerPlan(planHash, planVersion, plan.toJson(),
false);
dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
plan.toJson(), false);
DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
status.getResult() == PLAN_DONE);

View File

@ -68,6 +68,7 @@ public class TestDiskBalancerWithMockMover {
@Rule
public ExpectedException thrown = ExpectedException.none();
private static final String PLAN_FILE = "/system/current.plan.json";
private MiniDFSCluster cluster;
private String sourceName;
private String destName;
@ -125,7 +126,7 @@ public class TestDiskBalancerWithMockMover {
int version) throws IOException {
String planJson = plan.toJson();
String planID = DigestUtils.sha512Hex(planJson);
balancer.submitPlan(planID, version, planJson, false);
balancer.submitPlan(planID, version, PLAN_FILE, planJson, false);
}
private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer)
@ -219,7 +220,7 @@ public class TestDiskBalancerWithMockMover {
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.INVALID_PLAN));
balancer.submitPlan(planID, 1, null, false);
balancer.submitPlan(planID, 1, "no-plan-file.json", null, false);
}
@Test
@ -238,7 +239,7 @@ public class TestDiskBalancerWithMockMover {
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.INVALID_PLAN_HASH));
balancer.submitPlan(planID.replace(planID.charAt(0), repChar),
1, planJson, false);
1, PLAN_FILE, planJson, false);
}