HDFS-10808. DiskBalancer does not execute multi-steps plan-redux. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-09-09 15:00:39 -07:00
parent cba973f036
commit bee9f57f5c
3 changed files with 370 additions and 232 deletions

View File

@ -501,14 +501,11 @@ public class DiskBalancer {
public void run() { public void run() {
Thread.currentThread().setName("DiskBalancerThread"); Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}", LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
planFile, planID); planFile, planID);
try { for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry : workMap.entrySet()) {
workMap.entrySet()) { blockMover.setRunnable();
blockMover.copyBlocks(entry.getKey(), entry.getValue()); blockMover.copyBlocks(entry.getKey(), entry.getValue());
}
} finally {
blockMover.setExitFlag();
} }
} }
}); });
@ -857,8 +854,8 @@ public class DiskBalancer {
if (item.getErrorCount() >= getMaxError(item)) { if (item.getErrorCount() >= getMaxError(item)) {
item.setErrMsg("Error count exceeded."); item.setErrMsg("Error count exceeded.");
LOG.info("Maximum error count exceeded. Error count: {} Max error:{} " LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ",
, item.getErrorCount(), item.getMaxDiskErrors()); item.getErrorCount(), item.getMaxDiskErrors());
} }
return null; return null;
@ -962,7 +959,8 @@ public class DiskBalancer {
LOG.error("Exceeded the max error count. source {}, dest: {} " + LOG.error("Exceeded the max error count. source {}, dest: {} " +
"error count: {}", source.getBasePath(), "error count: {}", source.getBasePath(),
dest.getBasePath(), item.getErrorCount()); dest.getBasePath(), item.getErrorCount());
break; this.setExitFlag();
continue;
} }
// Check for the block tolerance constraint. // Check for the block tolerance constraint.
@ -971,7 +969,8 @@ public class DiskBalancer {
"blocks.", "blocks.",
source.getBasePath(), dest.getBasePath(), source.getBasePath(), dest.getBasePath(),
item.getBytesCopied(), item.getBlocksCopied()); item.getBytesCopied(), item.getBlocksCopied());
break; this.setExitFlag();
continue;
} }
ExtendedBlock block = getNextBlock(poolIters, item); ExtendedBlock block = getNextBlock(poolIters, item);
@ -979,7 +978,8 @@ public class DiskBalancer {
if (block == null) { if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " + LOG.error("No source blocks, exiting the copy. Source: {}, " +
"dest:{}", source.getBasePath(), dest.getBasePath()); "dest:{}", source.getBasePath(), dest.getBasePath());
break; this.setExitFlag();
continue;
} }
// check if someone told us exit, treat this as an interruption // check if someone told us exit, treat this as an interruption
@ -987,7 +987,7 @@ public class DiskBalancer {
// for the thread, since both getNextBlock and moveBlocAcrossVolume // for the thread, since both getNextBlock and moveBlocAcrossVolume
// can take some time. // can take some time.
if (!shouldRun()) { if (!shouldRun()) {
break; continue;
} }
long timeUsed; long timeUsed;
@ -1006,7 +1006,8 @@ public class DiskBalancer {
LOG.error("Destination volume: {} does not have enough space to" + LOG.error("Destination volume: {} does not have enough space to" +
" accommodate a block. Block Size: {} Exiting from" + " accommodate a block. Block Size: {} Exiting from" +
" copyBlocks.", dest.getBasePath(), block.getNumBytes()); " copyBlocks.", dest.getBasePath(), block.getNumBytes());
break; this.setExitFlag();
continue;
} }
LOG.debug("Moved block with size {} from {} to {}", LOG.debug("Moved block with size {} from {} to {}",

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
@ -37,19 +36,18 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -62,6 +60,7 @@ import static org.junit.Assert.assertTrue;
public class TestDiskBalancer { public class TestDiskBalancer {
private static final String PLAN_FILE = "/system/current.plan.json"; private static final String PLAN_FILE = "/system/current.plan.json";
static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class);
@Test @Test
public void testDiskBalancerNameNodeConnectivity() throws Exception { public void testDiskBalancerNameNodeConnectivity() throws Exception {
@ -110,227 +109,77 @@ public class TestDiskBalancer {
*/ */
@Test @Test
public void testDiskBalancerEndToEnd() throws Exception { public void testDiskBalancerEndToEnd() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
final int defaultBlockSize = 100;
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize); final int blockCount = 100;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize); final int blockSize = 1024;
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); final int diskCount = 2;
final int numDatanodes = 1; final int dataNodeCount = 1;
final String fileName = "/tmp.txt"; final int dataNodeIndex = 0;
final Path filePath = new Path(fileName); final int sourceDiskIndex = 0;
final int blocks = 100;
final int blocksSize = 1024;
final int fileLen = blocks * blocksSize;
MiniDFSCluster cluster = new ClusterBuilder()
// Write a file and restart the cluster .setBlockCount(blockCount)
long[] capacities = new long[]{defaultBlockSize * 2 * fileLen, .setBlockSize(blockSize)
defaultBlockSize * 2 * fileLen}; .setDiskCount(diskCount)
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .setNumDatanodes(dataNodeCount)
.numDataNodes(numDatanodes) .setConf(conf)
.storageCapacities(capacities)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build(); .build();
FsVolumeImpl source = null;
FsVolumeImpl dest = null;
try { try {
cluster.waitActive(); DataMover dataMover = new DataMover(cluster, dataNodeIndex,
Random r = new Random(); sourceDiskIndex, conf, blockSize, blockCount);
FileSystem fs = cluster.getFileSystem(0); dataMover.moveDataToSourceDisk();
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, NodePlan plan = dataMover.generatePlan();
numDatanodes - 1); dataMover.executePlan(plan);
dataMover.verifyPlanExectionDone();
DFSTestUtil.waitReplication(fs, filePath, (short) 1); dataMover.verifyAllVolumesHaveData();
cluster.restartDataNodes(); dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
cluster.waitActive();
// Get the data node and move all data to one disk.
DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1);
try (FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences()) {
source = (FsVolumeImpl) refs.get(0);
dest = (FsVolumeImpl) refs.get(1);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
source, dest);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
}
cluster.restartDataNodes();
cluster.waitActive();
// Start up a disk balancer and read the cluster info.
final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1);
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
// Rewrite the capacity in the model to show that disks need
// re-balancing.
setVolumeCapacity(diskBalancerCluster, defaultBlockSize * 2 * fileLen,
"DISK");
// Pick a node to process.
nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode
.getDatanodeUuid()));
diskBalancerCluster.setNodesToProcess(nodesToProcess);
// Compute a plan.
List<NodePlan> clusterplan = diskBalancerCluster.computePlan(0.0f);
// Now we must have a plan,since the node is imbalanced and we
// asked the disk balancer to create a plan.
assertTrue(clusterplan.size() == 1);
NodePlan plan = clusterplan.get(0);
plan.setNodeUUID(dnNode.getDatanodeUuid());
plan.setTimeStamp(Time.now());
String planJson = plan.toJson();
String planID = DigestUtils.shaHex(planJson);
assertNotNull(plan.getVolumeSetPlans());
assertTrue(plan.getVolumeSetPlans().size() > 0);
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
// Submit the plan and wait till the execution is done.
newDN.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
String jmxString = newDN.getDiskBalancerStatus();
assertNotNull(jmxString);
DiskBalancerWorkStatus status =
DiskBalancerWorkStatus.parseJson(jmxString);
DiskBalancerWorkStatus realStatus = newDN.queryDiskBalancerPlan();
assertEquals(realStatus.getPlanID(), status.getPlanID());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return newDN.queryDiskBalancerPlan().getResult() ==
DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
//verify that it worked.
dnNode = cluster.getDataNodes().get(numDatanodes - 1);
assertEquals(dnNode.queryDiskBalancerPlan().getResult(),
DiskBalancerWorkStatus.Result.PLAN_DONE);
try (FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences()) {
source = (FsVolumeImpl) refs.get(0);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
}
// Tolerance
long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove()
* 10) / 100;
assertTrue(
(DiskBalancerTestUtil.getBlockCount(source) *
defaultBlockSize + delta) >=
plan.getVolumeSetPlans().get(0).getBytesToMove());
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=60000) @Test
public void testBalanceDataBetweenMultiplePairsOfVolumes() public void testBalanceDataBetweenMultiplePairsOfVolumes()
throws Exception { throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
final int DEFAULT_BLOCK_SIZE = 2048;
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); final int blockCount = 1000;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); final int blockSize = 1024;
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
final int NUM_DATANODES = 1; // create 3 disks, that means we will have 2 plans
final long CAP = 512 * 1024; // Move Data from disk0->disk1 and disk0->disk2.
final Path testFile = new Path("/testfile"); final int diskCount = 3;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final int dataNodeCount = 1;
.numDataNodes(NUM_DATANODES) final int dataNodeIndex = 0;
.storageCapacities(new long[]{CAP, CAP, CAP, CAP}) final int sourceDiskIndex = 0;
.storagesPerDatanode(4)
MiniDFSCluster cluster = new ClusterBuilder()
.setBlockCount(blockCount)
.setBlockSize(blockSize)
.setDiskCount(diskCount)
.setNumDatanodes(dataNodeCount)
.setConf(conf)
.build(); .build();
try { try {
cluster.waitActive(); DataMover dataMover = new DataMover(cluster, dataNodeIndex,
DistributedFileSystem fs = cluster.getFileSystem(); sourceDiskIndex, conf, blockSize, blockCount);
TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0); dataMover.moveDataToSourceDisk();
NodePlan plan = dataMover.generatePlan();
DFSTestUtil.waitReplication(fs, testFile, (short) 1); // 3 disks , The plan should move data both disks,
DataNode dnNode = cluster.getDataNodes().get(0); // so we must have 2 plan steps.
// Move data out of two volumes to make them empty. assertEquals(plan.getVolumeSetPlans().size(), 2);
try (FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences()) {
assertEquals(4, refs.size());
for (int i = 0; i < refs.size(); i += 2) {
FsVolumeImpl source = (FsVolumeImpl) refs.get(i);
FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
source, dest);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
}
}
cluster.restartDataNodes(); dataMover.executePlan(plan);
cluster.waitActive(); dataMover.verifyPlanExectionDone();
dataMover.verifyAllVolumesHaveData();
// Start up a disk balancer and read the cluster info. dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
final DataNode dataNode = cluster.getDataNodes().get(0);
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
// Rewrite the capacity in the model to show that disks need
// re-balancing.
setVolumeCapacity(diskBalancerCluster, CAP, "DISK");
nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
dataNode.getDatanodeUuid()));
diskBalancerCluster.setNodesToProcess(nodesToProcess);
// Compute a plan.
List<NodePlan> clusterPlan = diskBalancerCluster.computePlan(10.0f);
NodePlan plan = clusterPlan.get(0);
assertEquals(2, plan.getVolumeSetPlans().size());
plan.setNodeUUID(dnNode.getDatanodeUuid());
plan.setTimeStamp(Time.now());
String planJson = plan.toJson();
String planID = DigestUtils.shaHex(planJson);
dataNode.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return dataNode.queryDiskBalancerPlan().getResult() ==
DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
assertEquals(dataNode.queryDiskBalancerPlan().getResult(),
DiskBalancerWorkStatus.Result.PLAN_DONE);
try (FsDatasetSpi.FsVolumeReferences refs =
dataNode.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : refs) {
assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0);
}
}
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -353,4 +202,293 @@ public class TestDiskBalancer {
node.getVolumeSets().get(diskType).computeVolumeDataDensity(); node.getVolumeSets().get(diskType).computeVolumeDataDensity();
} }
} }
/**
* Helper class that allows us to create different kinds of MiniDFSClusters
* and populate data.
*/
static class ClusterBuilder {
private Configuration conf;
private int blockSize;
private int numDatanodes;
private int fileLen;
private int blockCount;
private int diskCount;
public ClusterBuilder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public ClusterBuilder setBlockSize(int blockSize) {
this.blockSize = blockSize;
return this;
}
public ClusterBuilder setNumDatanodes(int datanodeCount) {
this.numDatanodes = datanodeCount;
return this;
}
public ClusterBuilder setBlockCount(int blockCount) {
this.blockCount = blockCount;
return this;
}
public ClusterBuilder setDiskCount(int diskCount) {
this.diskCount = diskCount;
return this;
}
private long[] getCapacities(int diskCount, int bSize, int fSize) {
Preconditions.checkState(diskCount > 0);
long[] capacities = new long[diskCount];
for (int x = 0; x < diskCount; x++) {
capacities[x] = diskCount * bSize * fSize * 2L;
}
return capacities;
}
private StorageType[] getStorageTypes(int diskCount) {
Preconditions.checkState(diskCount > 0);
StorageType[] array = new StorageType[diskCount];
for (int x = 0; x < diskCount; x++) {
array[x] = StorageType.DISK;
}
return array;
}
public MiniDFSCluster build() throws IOException, TimeoutException,
InterruptedException {
Preconditions.checkNotNull(this.conf);
Preconditions.checkState(blockSize > 0);
Preconditions.checkState(numDatanodes > 0);
fileLen = blockCount * blockSize;
Preconditions.checkState(fileLen > 0);
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
final String fileName = "/tmp.txt";
Path filePath = new Path(fileName);
fileLen = blockCount * blockSize;
// Write a file and restart the cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.storageCapacities(getCapacities(diskCount, blockSize, fileLen))
.storageTypes(getStorageTypes(diskCount))
.storagesPerDatanode(diskCount)
.build();
generateData(filePath, cluster);
cluster.restartDataNodes();
cluster.waitActive();
return cluster;
}
private void generateData(Path filePath, MiniDFSCluster cluster)
throws IOException, InterruptedException, TimeoutException {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
numDatanodes - 1);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
cluster.restartDataNodes();
cluster.waitActive();
}
}
class DataMover {
private final MiniDFSCluster cluster;
private final int sourceDiskIndex;
private final int dataNodeIndex;
private final Configuration conf;
private final int blockCount;
private final int blockSize;
private DataNode node;
/**
* Constructs a DataMover class.
*
* @param cluster - MiniDFSCluster.
* @param dataNodeIndex - Datanode to operate against.
* @param sourceDiskIndex - source Disk Index.
*/
public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int
sourceDiskIndex, Configuration conf, int blockSize, int
blockCount) {
this.cluster = cluster;
this.dataNodeIndex = dataNodeIndex;
this.node = cluster.getDataNodes().get(dataNodeIndex);
this.sourceDiskIndex = sourceDiskIndex;
this.conf = conf;
this.blockCount = blockCount;
this.blockSize = blockSize;
}
/**
* Moves all data to a source disk to create disk imbalance so we can run a
* planner.
*
* @throws IOException
*/
public void moveDataToSourceDisk() throws IOException {
moveAllDataToDestDisk(this.node, sourceDiskIndex);
cluster.restartDataNodes();
cluster.waitActive();
}
/**
* Moves all data in the data node to one disk.
*
* @param dataNode - Datanode
* @param destDiskindex - Index of the destination disk.
*/
private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex)
throws IOException {
Preconditions.checkNotNull(dataNode);
Preconditions.checkState(destDiskindex >= 0);
try (FsDatasetSpi.FsVolumeReferences refs =
dataNode.getFSDataset().getFsVolumeReferences()) {
if (refs.size() <= destDiskindex) {
throw new IllegalArgumentException("Invalid Disk index.");
}
FsVolumeImpl dest = (FsVolumeImpl) refs.get(destDiskindex);
for (int x = 0; x < refs.size(); x++) {
if (x == destDiskindex) {
continue;
}
FsVolumeImpl source = (FsVolumeImpl) refs.get(x);
DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(),
source, dest);
}
}
}
/**
* Generates a NodePlan for the datanode specified.
*
* @return NodePlan.
*/
public NodePlan generatePlan() throws Exception {
// Start up a disk balancer and read the cluster info.
node = cluster.getDataNodes().get(dataNodeIndex);
ClusterConnector nameNodeConnector =
ConnectorFactory.getCluster(cluster.getFileSystem(dataNodeIndex)
.getUri(), conf);
DiskBalancerCluster diskBalancerCluster =
new DiskBalancerCluster(nameNodeConnector);
diskBalancerCluster.readClusterInfo();
List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
// Rewrite the capacity in the model to show that disks need
// re-balancing.
setVolumeCapacity(diskBalancerCluster, blockSize * 2L * blockCount,
"DISK");
// Pick a node to process.
nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
node.getDatanodeUuid()));
diskBalancerCluster.setNodesToProcess(nodesToProcess);
// Compute a plan.
List<NodePlan> clusterplan = diskBalancerCluster.computePlan(0.0f);
// Now we must have a plan,since the node is imbalanced and we
// asked the disk balancer to create a plan.
assertTrue(clusterplan.size() == 1);
NodePlan plan = clusterplan.get(0);
plan.setNodeUUID(node.getDatanodeUuid());
plan.setTimeStamp(Time.now());
assertNotNull(plan.getVolumeSetPlans());
assertTrue(plan.getVolumeSetPlans().size() > 0);
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
return plan;
}
/**
* Waits for a plan executing to finish.
*/
public void executePlan(NodePlan plan) throws
IOException, TimeoutException, InterruptedException {
node = cluster.getDataNodes().get(dataNodeIndex);
String planJson = plan.toJson();
String planID = DigestUtils.shaHex(planJson);
// Submit the plan and wait till the execution is done.
node.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson,
false);
String jmxString = node.getDiskBalancerStatus();
assertNotNull(jmxString);
DiskBalancerWorkStatus status =
DiskBalancerWorkStatus.parseJson(jmxString);
DiskBalancerWorkStatus realStatus = node.queryDiskBalancerPlan();
assertEquals(realStatus.getPlanID(), status.getPlanID());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return node.queryDiskBalancerPlan().getResult() ==
DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
}
/**
* Verifies the Plan Execution has been done.
*/
public void verifyPlanExectionDone() throws IOException {
node = cluster.getDataNodes().get(dataNodeIndex);
assertEquals(node.queryDiskBalancerPlan().getResult(),
DiskBalancerWorkStatus.Result.PLAN_DONE);
}
/**
* Once diskBalancer is run, all volumes mush has some data.
*/
public void verifyAllVolumesHaveData() throws IOException {
node = cluster.getDataNodes().get(dataNodeIndex);
try (FsDatasetSpi.FsVolumeReferences refs =
node.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi volume : refs) {
assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
LOG.info(refs.toString() + " : Block Count : {}",
DiskBalancerTestUtil.getBlockCount(volume));
}
}
}
/**
* Verifies that tolerance values are honored correctly.
*/
public void verifyTolerance(NodePlan plan, int planIndex, int
sourceDiskIndex, int tolerance) throws IOException {
// Tolerance
long delta = (plan.getVolumeSetPlans().get(planIndex).getBytesToMove()
* tolerance) / 100;
FsVolumeImpl volume = null;
try (FsDatasetSpi.FsVolumeReferences refs =
node.getFSDataset().getFsVolumeReferences()) {
volume = (FsVolumeImpl) refs.get(sourceDiskIndex);
assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
assertTrue(
(DiskBalancerTestUtil.getBlockCount(volume) *
(blockSize + delta)) >=
plan.getVolumeSetPlans().get(0).getBytesToMove());
}
}
}
} }

View File

@ -358,14 +358,13 @@ public class TestDiskBalancerWithMockMover {
private AtomicBoolean shouldRun; private AtomicBoolean shouldRun;
private FsDatasetSpi dataset; private FsDatasetSpi dataset;
private Integer runCount; private int runCount;
private volatile boolean sleepInCopyBlocks; private volatile boolean sleepInCopyBlocks;
private long delay; private long delay;
public TestMover(FsDatasetSpi dataset) { public TestMover(FsDatasetSpi dataset) {
this.dataset = dataset; this.dataset = dataset;
this.shouldRun = new AtomicBoolean(false); this.shouldRun = new AtomicBoolean(false);
this.runCount = new Integer(0);
} }
public void setSleep() { public void setSleep() {
@ -401,7 +400,7 @@ public class TestDiskBalancerWithMockMover {
if (delay > 0) { if (delay > 0) {
Thread.sleep(delay); Thread.sleep(delay);
} }
synchronized (runCount) { synchronized (this) {
if (shouldRun()) { if (shouldRun()) {
runCount++; runCount++;
} }
@ -461,9 +460,9 @@ public class TestDiskBalancerWithMockMover {
} }
public int getRunCount() { public int getRunCount() {
synchronized (runCount) { synchronized (this) {
LOG.info("Run count : " + runCount.intValue()); LOG.info("Run count : " + runCount);
return runCount.intValue(); return runCount;
} }
} }
} }
@ -510,7 +509,7 @@ public class TestDiskBalancerWithMockMover {
} }
} }
private class DiskBalancerBuilder { private static class DiskBalancerBuilder {
private TestMover blockMover; private TestMover blockMover;
private Configuration conf; private Configuration conf;
private String nodeID; private String nodeID;
@ -546,7 +545,7 @@ public class TestDiskBalancerWithMockMover {
} }
} }
private class DiskBalancerClusterBuilder { private static class DiskBalancerClusterBuilder {
private String jsonFilePath; private String jsonFilePath;
private Configuration conf; private Configuration conf;
@ -573,7 +572,7 @@ public class TestDiskBalancerWithMockMover {
} }
} }
private class PlanBuilder { private static class PlanBuilder {
private String sourcePath; private String sourcePath;
private String destPath; private String destPath;
private String sourceUUID; private String sourceUUID;