HDFS-9709. DiskBalancer : Add tests for disk balancer using a Mock Mover class. Contributed by Anu Engineer.
This commit is contained in:
parent
9be9703716
commit
6c606bf5c8
|
@ -40,7 +40,11 @@ import java.io.IOException;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,6 +110,7 @@ public class DiskBalancer {
|
||||||
this.isDiskBalancerEnabled = false;
|
this.isDiskBalancerEnabled = false;
|
||||||
this.currentResult = Result.NO_PLAN;
|
this.currentResult = Result.NO_PLAN;
|
||||||
if ((this.future != null) && (!this.future.isDone())) {
|
if ((this.future != null) && (!this.future.isDone())) {
|
||||||
|
this.currentResult = Result.PLAN_CANCELLED;
|
||||||
this.blockMover.setExitFlag();
|
this.blockMover.setExitFlag();
|
||||||
shutdownExecutor();
|
shutdownExecutor();
|
||||||
}
|
}
|
||||||
|
@ -120,9 +125,9 @@ public class DiskBalancer {
|
||||||
private void shutdownExecutor() {
|
private void shutdownExecutor() {
|
||||||
scheduler.shutdown();
|
scheduler.shutdown();
|
||||||
try {
|
try {
|
||||||
if(!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
|
if(!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||||
scheduler.shutdownNow();
|
scheduler.shutdownNow();
|
||||||
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
|
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||||
LOG.error("Disk Balancer : Scheduler did not terminate.");
|
LOG.error("Disk Balancer : Scheduler did not terminate.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -218,6 +223,7 @@ public class DiskBalancer {
|
||||||
if (!this.future.isDone()) {
|
if (!this.future.isDone()) {
|
||||||
this.blockMover.setExitFlag();
|
this.blockMover.setExitFlag();
|
||||||
shutdownExecutor();
|
shutdownExecutor();
|
||||||
|
this.currentResult = Result.PLAN_CANCELLED;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
@ -537,7 +543,7 @@ public class DiskBalancer {
|
||||||
/**
|
/**
|
||||||
* Holds references to actual volumes that we will be operating against.
|
* Holds references to actual volumes that we will be operating against.
|
||||||
*/
|
*/
|
||||||
static class VolumePair {
|
public static class VolumePair {
|
||||||
private final FsVolumeSpi source;
|
private final FsVolumeSpi source;
|
||||||
private final FsVolumeSpi dest;
|
private final FsVolumeSpi dest;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer;
|
||||||
|
|
||||||
|
import org.hamcrest.Description;
|
||||||
|
import org.hamcrest.TypeSafeMatcher;
|
||||||
|
|
||||||
|
public class DiskBalancerResultVerifier
|
||||||
|
extends TypeSafeMatcher<DiskBalancerException> {
|
||||||
|
private final DiskBalancerException.Result expectedResult;
|
||||||
|
|
||||||
|
DiskBalancerResultVerifier(DiskBalancerException.Result expectedResult) {
|
||||||
|
this.expectedResult = expectedResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean matchesSafely(DiskBalancerException exception) {
|
||||||
|
return (this.expectedResult == exception.getResult());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void describeTo(Description description) {
|
||||||
|
description.appendText("expects Result: ")
|
||||||
|
.appendValue(this.expectedResult);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -90,8 +90,7 @@ public class TestDiskBalancerRPC {
|
||||||
int planVersion = rpcTestHelper.getPlanVersion();
|
int planVersion = rpcTestHelper.getPlanVersion();
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
thrown.expect(new
|
thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_HASH));
|
||||||
ResultVerifier(Result.INVALID_PLAN_HASH));
|
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,8 +103,7 @@ public class TestDiskBalancerRPC {
|
||||||
planVersion++;
|
planVersion++;
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
thrown.expect(new
|
thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_VERSION));
|
||||||
ResultVerifier(Result.INVALID_PLAN_VERSION));
|
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,8 +115,7 @@ public class TestDiskBalancerRPC {
|
||||||
int planVersion = rpcTestHelper.getPlanVersion();
|
int planVersion = rpcTestHelper.getPlanVersion();
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
thrown.expect(new
|
thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN));
|
||||||
ResultVerifier(Result.INVALID_PLAN));
|
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,8 +140,7 @@ public class TestDiskBalancerRPC {
|
||||||
planHash = String.valueOf(hashArray);
|
planHash = String.valueOf(hashArray);
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
thrown.expect(new
|
thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
|
||||||
ResultVerifier(Result.NO_SUCH_PLAN));
|
|
||||||
dataNode.cancelDiskBalancePlan(planHash);
|
dataNode.cancelDiskBalancePlan(planHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,8 +151,7 @@ public class TestDiskBalancerRPC {
|
||||||
String planHash = "";
|
String planHash = "";
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
thrown.expect(new
|
thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
|
||||||
ResultVerifier(Result.NO_SUCH_PLAN));
|
|
||||||
dataNode.cancelDiskBalancePlan(planHash);
|
dataNode.cancelDiskBalancePlan(planHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,8 +177,7 @@ public class TestDiskBalancerRPC {
|
||||||
final String invalidSetting = "invalidSetting";
|
final String invalidSetting = "invalidSetting";
|
||||||
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
thrown.expect(new
|
thrown.expect(new DiskBalancerResultVerifier(Result.UNKNOWN_KEY));
|
||||||
ResultVerifier(Result.UNKNOWN_KEY));
|
|
||||||
dataNode.getDiskBalancerSetting(invalidSetting);
|
dataNode.getDiskBalancerSetting(invalidSetting);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,25 +268,4 @@ public class TestDiskBalancerRPC {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ResultVerifier
|
|
||||||
extends TypeSafeMatcher<DiskBalancerException> {
|
|
||||||
private final DiskBalancerException.Result expectedResult;
|
|
||||||
|
|
||||||
ResultVerifier(DiskBalancerException.Result expectedResult){
|
|
||||||
this.expectedResult = expectedResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean matchesSafely(DiskBalancerException exception) {
|
|
||||||
return (this.expectedResult == exception.getResult());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void describeTo(Description description) {
|
|
||||||
description.appendText("expects Result: ")
|
|
||||||
.appendValue(this.expectedResult);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,570 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.diskbalancer;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
|
||||||
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class TestDiskBalancerWithMockMover {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestDiskBalancerWithMockMover.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
|
MiniDFSCluster cluster;
|
||||||
|
String sourceName;
|
||||||
|
String destName;
|
||||||
|
String sourceUUID;
|
||||||
|
String destUUID;
|
||||||
|
String nodeID;
|
||||||
|
DataNode dataNode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks that we return the right error if diskbalancer is not enabled.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDiskBalancerDisabled() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, false);
|
||||||
|
restartDataNode();
|
||||||
|
|
||||||
|
TestMover blockMover = new TestMover(cluster.getDataNodes()
|
||||||
|
.get(0).getFSDataset());
|
||||||
|
|
||||||
|
DiskBalancer balancer = new DiskBalancerBuilder(conf)
|
||||||
|
.setMover(blockMover)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
|
||||||
|
.Result.DISK_BALANCER_NOT_ENABLED));
|
||||||
|
|
||||||
|
balancer.queryWorkStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks that Enable flag works correctly.
|
||||||
|
*
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDiskBalancerEnabled() throws DiskBalancerException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
|
||||||
|
TestMover blockMover = new TestMover(cluster.getDataNodes()
|
||||||
|
.get(0).getFSDataset());
|
||||||
|
|
||||||
|
DiskBalancer balancer = new DiskBalancerBuilder(conf)
|
||||||
|
.setMover(blockMover)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DiskBalancerWorkStatus status = balancer.queryWorkStatus();
|
||||||
|
assertEquals(NO_PLAN, status.getResult());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer,
|
||||||
|
int version) throws IOException {
|
||||||
|
String planJson = plan.toJson();
|
||||||
|
String planID = DigestUtils.sha512Hex(planJson);
|
||||||
|
balancer.submitPlan(planID, version, planJson, 10, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer)
|
||||||
|
throws IOException {
|
||||||
|
executeSubmitPlan(plan, balancer, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a second submit plan fails.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testResubmitDiskBalancerPlan() throws Exception {
|
||||||
|
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
|
||||||
|
NodePlan plan = mockMoverHelper.getPlan();
|
||||||
|
DiskBalancer balancer = mockMoverHelper.getBalancer();
|
||||||
|
|
||||||
|
// ask block mover to get stuck in copy block
|
||||||
|
mockMoverHelper.getBlockMover().setSleep();
|
||||||
|
executeSubmitPlan(plan, balancer);
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
|
||||||
|
.Result.PLAN_ALREADY_IN_PROGRESS));
|
||||||
|
executeSubmitPlan(plan, balancer);
|
||||||
|
|
||||||
|
// Not needed but this is the cleanup step.
|
||||||
|
mockMoverHelper.getBlockMover().clearSleep();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitDiskBalancerPlan() throws Exception {
|
||||||
|
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
|
||||||
|
NodePlan plan = mockMoverHelper.getPlan();
|
||||||
|
DiskBalancer balancer = mockMoverHelper.getBalancer();
|
||||||
|
|
||||||
|
executeSubmitPlan(plan, balancer);
|
||||||
|
int counter = 0;
|
||||||
|
while ((balancer.queryWorkStatus().getResult() != PLAN_DONE) &&
|
||||||
|
(counter < 3)) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Asserts that submit plan caused an execution in the background.
|
||||||
|
assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitWithOlderPlan() throws Exception {
|
||||||
|
final long MILLISECOND_IN_AN_HOUR = 1000 * 60 * 60L;
|
||||||
|
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
|
||||||
|
NodePlan plan = mockMoverHelper.getPlan();
|
||||||
|
DiskBalancer balancer = mockMoverHelper.getBalancer();
|
||||||
|
|
||||||
|
plan.setTimeStamp(Time.now() - (32 * MILLISECOND_IN_AN_HOUR));
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
|
||||||
|
.Result.OLD_PLAN_SUBMITTED));
|
||||||
|
executeSubmitPlan(plan, balancer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitWithOldInvalidVersion() throws Exception {
|
||||||
|
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
|
||||||
|
NodePlan plan = mockMoverHelper.getPlan();
|
||||||
|
DiskBalancer balancer = mockMoverHelper.getBalancer();
|
||||||
|
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
|
||||||
|
.Result.INVALID_PLAN_VERSION));
|
||||||
|
|
||||||
|
// Plan version is invalid -- there is no version 0.
|
||||||
|
executeSubmitPlan(plan, balancer, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitWithNullPlan() throws Exception {
|
||||||
|
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
|
||||||
|
NodePlan plan = mockMoverHelper.getPlan();
|
||||||
|
DiskBalancer balancer = mockMoverHelper.getBalancer();
|
||||||
|
String planJson = plan.toJson();
|
||||||
|
String planID = DigestUtils.sha512Hex(planJson);
|
||||||
|
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
|
||||||
|
.Result.INVALID_PLAN));
|
||||||
|
|
||||||
|
balancer.submitPlan(planID, 1, null, 10, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitWithInvalidHash() throws Exception {
|
||||||
|
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
|
||||||
|
NodePlan plan = mockMoverHelper.getPlan();
|
||||||
|
DiskBalancer balancer = mockMoverHelper.getBalancer();
|
||||||
|
|
||||||
|
|
||||||
|
String planJson = plan.toJson();
|
||||||
|
String planID = DigestUtils.sha512Hex(planJson);
|
||||||
|
char repChar = planID.charAt(0);
|
||||||
|
repChar++;
|
||||||
|
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
|
||||||
|
.Result.INVALID_PLAN_HASH));
|
||||||
|
balancer.submitPlan(planID.replace(planID.charAt(0), repChar),
|
||||||
|
1, planJson, 10, false);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Cancel Plan.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCancelDiskBalancerPlan() throws Exception {
|
||||||
|
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
|
||||||
|
NodePlan plan = mockMoverHelper.getPlan();
|
||||||
|
DiskBalancer balancer = mockMoverHelper.getBalancer();
|
||||||
|
|
||||||
|
|
||||||
|
// ask block mover to delay execution
|
||||||
|
mockMoverHelper.getBlockMover().setSleep();
|
||||||
|
executeSubmitPlan(plan, balancer);
|
||||||
|
|
||||||
|
|
||||||
|
String planJson = plan.toJson();
|
||||||
|
String planID = DigestUtils.sha512Hex(planJson);
|
||||||
|
balancer.cancelPlan(planID);
|
||||||
|
|
||||||
|
DiskBalancerWorkStatus status = balancer.queryWorkStatus();
|
||||||
|
assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
|
||||||
|
status.getResult());
|
||||||
|
|
||||||
|
|
||||||
|
executeSubmitPlan(plan, balancer);
|
||||||
|
|
||||||
|
// Send a Wrong cancellation request.
|
||||||
|
char first = planID.charAt(0);
|
||||||
|
first++;
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
|
||||||
|
.Result.NO_SUCH_PLAN));
|
||||||
|
balancer.cancelPlan(planID.replace(planID.charAt(0), first));
|
||||||
|
|
||||||
|
// Now cancel the real one
|
||||||
|
balancer.cancelPlan(planID);
|
||||||
|
mockMoverHelper.getBlockMover().clearSleep(); // unblock mover.
|
||||||
|
|
||||||
|
status = balancer.queryWorkStatus();
|
||||||
|
assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
|
||||||
|
status.getResult());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
final int NUM_STORAGES_PER_DN = 2;
|
||||||
|
cluster = new MiniDFSCluster
|
||||||
|
.Builder(conf).numDataNodes(3)
|
||||||
|
.storagesPerDatanode(NUM_STORAGES_PER_DN)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
dataNode = cluster.getDataNodes().get(0);
|
||||||
|
FsDatasetSpi.FsVolumeReferences references = dataNode.getFSDataset()
|
||||||
|
.getFsVolumeReferences();
|
||||||
|
|
||||||
|
nodeID = dataNode.getDatanodeUuid();
|
||||||
|
sourceName = references.get(0).getBasePath();
|
||||||
|
destName = references.get(1).getBasePath();
|
||||||
|
sourceUUID = references.get(0).getStorageID();
|
||||||
|
destUUID = references.get(1).getStorageID();
|
||||||
|
references.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void restartDataNode() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.restartDataNode(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows us to control mover class for test purposes.
|
||||||
|
*/
|
||||||
|
public static class TestMover implements DiskBalancer.BlockMover {
|
||||||
|
|
||||||
|
private AtomicBoolean shouldRun;
|
||||||
|
private FsDatasetSpi dataset;
|
||||||
|
private Integer runCount;
|
||||||
|
private volatile boolean sleepInCopyBlocks;
|
||||||
|
private long delay;
|
||||||
|
|
||||||
|
public TestMover(FsDatasetSpi dataset) {
|
||||||
|
this.dataset = dataset;
|
||||||
|
this.shouldRun = new AtomicBoolean(false);
|
||||||
|
this.runCount = new Integer(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSleep() {
|
||||||
|
sleepInCopyBlocks = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearSleep() {
|
||||||
|
sleepInCopyBlocks = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDelay(long milliseconds) {
|
||||||
|
this.delay = milliseconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copies blocks from a set of volumes.
|
||||||
|
*
|
||||||
|
* @param pair - Source and Destination Volumes.
|
||||||
|
* @param item - Number of bytes to move from volumes.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void copyBlocks(DiskBalancer.VolumePair pair,
|
||||||
|
DiskBalancerWorkItem item) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
// get stuck if we are asked to sleep.
|
||||||
|
while (sleepInCopyBlocks) {
|
||||||
|
if (!this.shouldRun()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
if (delay > 0) {
|
||||||
|
Thread.sleep(delay);
|
||||||
|
}
|
||||||
|
synchronized (runCount) {
|
||||||
|
if (shouldRun()) {
|
||||||
|
runCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// A failure here can be safely ignored with no impact for tests.
|
||||||
|
LOG.error(ex.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets copyblocks into runnable state.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setRunnable() {
|
||||||
|
this.shouldRun.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signals copy block to exit.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setExitFlag() {
|
||||||
|
this.shouldRun.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the shouldRun boolean flag.
|
||||||
|
*/
|
||||||
|
public boolean shouldRun() {
|
||||||
|
return this.shouldRun.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FsDatasetSpi getDataset() {
|
||||||
|
return this.dataset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getRunCount() {
|
||||||
|
synchronized (runCount) {
|
||||||
|
LOG.info("Run count : " + runCount.intValue());
|
||||||
|
return runCount.intValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MockMoverHelper {
|
||||||
|
private DiskBalancer balancer;
|
||||||
|
private NodePlan plan;
|
||||||
|
private TestMover blockMover;
|
||||||
|
|
||||||
|
public DiskBalancer getBalancer() {
|
||||||
|
return balancer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodePlan getPlan() {
|
||||||
|
return plan;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestMover getBlockMover() {
|
||||||
|
return blockMover;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockMoverHelper invoke() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
restartDataNode();
|
||||||
|
|
||||||
|
blockMover = new TestMover(dataNode.getFSDataset());
|
||||||
|
blockMover.setRunnable();
|
||||||
|
|
||||||
|
balancer = new DiskBalancerBuilder(conf)
|
||||||
|
.setMover(blockMover)
|
||||||
|
.setNodeID(nodeID)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DiskBalancerCluster diskBalancerCluster = new DiskBalancerClusterBuilder()
|
||||||
|
.setClusterSource("/diskBalancer/data-cluster-3node-3disk.json")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
plan = new PlanBuilder(diskBalancerCluster, nodeID)
|
||||||
|
.setPathMap(sourceName, destName)
|
||||||
|
.setUUIDMap(sourceUUID, destUUID)
|
||||||
|
.build();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class DiskBalancerBuilder {
|
||||||
|
private TestMover blockMover;
|
||||||
|
private Configuration conf;
|
||||||
|
private String nodeID;
|
||||||
|
|
||||||
|
public DiskBalancerBuilder(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DiskBalancerBuilder setNodeID(String nodeID) {
|
||||||
|
this.nodeID = nodeID;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DiskBalancerBuilder setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DiskBalancerBuilder setMover(TestMover mover) {
|
||||||
|
this.blockMover = mover;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DiskBalancerBuilder setRunnable() {
|
||||||
|
blockMover.setRunnable();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DiskBalancer build() {
|
||||||
|
Preconditions.checkNotNull(blockMover);
|
||||||
|
return new DiskBalancer(nodeID, conf,
|
||||||
|
blockMover);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class DiskBalancerClusterBuilder {
|
||||||
|
private String jsonFilePath;
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
public DiskBalancerClusterBuilder setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DiskBalancerClusterBuilder setClusterSource(String jsonFilePath)
|
||||||
|
throws Exception {
|
||||||
|
this.jsonFilePath = jsonFilePath;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DiskBalancerCluster build() throws Exception {
|
||||||
|
DiskBalancerCluster diskBalancerCluster;
|
||||||
|
URI clusterJson = getClass().getResource(jsonFilePath).toURI();
|
||||||
|
ClusterConnector jsonConnector =
|
||||||
|
ConnectorFactory.getCluster(clusterJson, conf);
|
||||||
|
diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
|
||||||
|
diskBalancerCluster.readClusterInfo();
|
||||||
|
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
|
||||||
|
return diskBalancerCluster;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class PlanBuilder {
|
||||||
|
private String sourcePath;
|
||||||
|
private String destPath;
|
||||||
|
private String sourceUUID;
|
||||||
|
private String destUUID;
|
||||||
|
private DiskBalancerCluster balancerCluster;
|
||||||
|
private String nodeID;
|
||||||
|
|
||||||
|
public PlanBuilder(DiskBalancerCluster balancerCluster, String nodeID) {
|
||||||
|
this.balancerCluster = balancerCluster;
|
||||||
|
this.nodeID = nodeID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PlanBuilder setPathMap(String sourcePath, String destPath) {
|
||||||
|
this.sourcePath = sourcePath;
|
||||||
|
this.destPath = destPath;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public PlanBuilder setUUIDMap(String sourceUUID, String destUUID) {
|
||||||
|
this.sourceUUID = sourceUUID;
|
||||||
|
this.destUUID = destUUID;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodePlan build() throws Exception {
|
||||||
|
final int dnIndex = 0;
|
||||||
|
Preconditions.checkNotNull(balancerCluster);
|
||||||
|
Preconditions.checkState(nodeID.length() > 0);
|
||||||
|
|
||||||
|
DiskBalancerDataNode node = balancerCluster.getNodes().get(dnIndex);
|
||||||
|
node.setDataNodeUUID(nodeID);
|
||||||
|
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
|
||||||
|
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
|
||||||
|
());
|
||||||
|
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
|
||||||
|
setVolumeNames(plan);
|
||||||
|
return plan;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setVolumeNames(NodePlan plan) {
|
||||||
|
Iterator<Step> iter = plan.getVolumeSetPlans().iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
MoveStep nextStep = (MoveStep) iter.next();
|
||||||
|
nextStep.getSourceVolume().setPath(sourcePath);
|
||||||
|
nextStep.getSourceVolume().setUuid(sourceUUID);
|
||||||
|
nextStep.getDestinationVolume().setPath(destPath);
|
||||||
|
nextStep.getDestinationVolume().setUuid(destUUID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue