HDFS-13188. Disk Balancer: Support multiple block pools during block move. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
e6f99e205b
commit
7060725662
|
@ -958,8 +958,8 @@ public class DiskBalancer {
|
||||||
ExtendedBlock block = null;
|
ExtendedBlock block = null;
|
||||||
while (block == null && currentCount < poolIters.size()) {
|
while (block == null && currentCount < poolIters.size()) {
|
||||||
currentCount++;
|
currentCount++;
|
||||||
poolIndex = poolIndex++ % poolIters.size();
|
int index = poolIndex++ % poolIters.size();
|
||||||
FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(poolIndex);
|
FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(index);
|
||||||
block = getBlockToCopy(currentPoolIter, item);
|
block = getBlockToCopy(currentPoolIter, item);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,10 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -51,6 +55,7 @@ import java.util.concurrent.TimeoutException;
|
||||||
* Helper class to create various cluster configurations at run time.
|
* Helper class to create various cluster configurations at run time.
|
||||||
*/
|
*/
|
||||||
public class DiskBalancerTestUtil {
|
public class DiskBalancerTestUtil {
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class);
|
||||||
public static final long MB = 1024 * 1024L;
|
public static final long MB = 1024 * 1024L;
|
||||||
public static final long GB = MB * 1024L;
|
public static final long GB = MB * 1024L;
|
||||||
public static final long TB = GB * 1024L;
|
public static final long TB = GB * 1024L;
|
||||||
|
@ -241,17 +246,25 @@ public class DiskBalancerTestUtil {
|
||||||
* @return Number of Blocks.
|
* @return Number of Blocks.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static int getBlockCount(FsVolumeSpi source) throws IOException {
|
public static int getBlockCount(FsVolumeSpi source,
|
||||||
|
boolean checkblockPoolCount)
|
||||||
|
throws IOException {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (String blockPoolID : source.getBlockPoolList()) {
|
for (String blockPoolID : source.getBlockPoolList()) {
|
||||||
FsVolumeSpi.BlockIterator sourceIter =
|
FsVolumeSpi.BlockIterator sourceIter =
|
||||||
source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
|
source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
|
||||||
|
int blockCount = 0;
|
||||||
while (!sourceIter.atEnd()) {
|
while (!sourceIter.atEnd()) {
|
||||||
ExtendedBlock block = sourceIter.nextBlock();
|
ExtendedBlock block = sourceIter.nextBlock();
|
||||||
if (block != null) {
|
if (block != null) {
|
||||||
count++;
|
blockCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (checkblockPoolCount) {
|
||||||
|
LOG.info("Block Pool Id: {}, blockCount: {}", blockPoolID, blockCount);
|
||||||
|
assertTrue(blockCount > 0);
|
||||||
|
}
|
||||||
|
count += blockCount;
|
||||||
}
|
}
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
@ -320,10 +333,10 @@ public class DiskBalancerTestUtil {
|
||||||
dnNode.getFSDataset().getFsVolumeReferences()) {
|
dnNode.getFSDataset().getFsVolumeReferences()) {
|
||||||
source = (FsVolumeImpl) refs.get(0);
|
source = (FsVolumeImpl) refs.get(0);
|
||||||
dest = (FsVolumeImpl) refs.get(1);
|
dest = (FsVolumeImpl) refs.get(1);
|
||||||
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
|
assertTrue(DiskBalancerTestUtil.getBlockCount(source, true) > 0);
|
||||||
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
|
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
|
||||||
source, dest);
|
source, dest);
|
||||||
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
|
assertEquals(0, DiskBalancerTestUtil.getBlockCount(source, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
@ -160,6 +161,62 @@ public class TestDiskBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiskBalancerWithFederatedCluster() throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
final int blockCount = 100;
|
||||||
|
final int blockSize = 1024;
|
||||||
|
final int diskCount = 2;
|
||||||
|
final int dataNodeCount = 1;
|
||||||
|
final int dataNodeIndex = 0;
|
||||||
|
final int sourceDiskIndex = 0;
|
||||||
|
final long cap = blockSize * 3L * blockCount;
|
||||||
|
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||||
|
.numDataNodes(dataNodeCount)
|
||||||
|
.storagesPerDatanode(diskCount)
|
||||||
|
.storageCapacities(new long[] {cap, cap})
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
DFSTestUtil.setFederatedConfiguration(cluster, conf);
|
||||||
|
|
||||||
|
final String fileName = "/tmp.txt";
|
||||||
|
final Path filePath = new Path(fileName);
|
||||||
|
long fileLen = blockCount * blockSize;
|
||||||
|
|
||||||
|
|
||||||
|
FileSystem fs = cluster.getFileSystem(0);
|
||||||
|
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
|
||||||
|
0);
|
||||||
|
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
|
||||||
|
|
||||||
|
fs = cluster.getFileSystem(1);
|
||||||
|
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1,
|
||||||
|
1);
|
||||||
|
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
|
||||||
|
|
||||||
|
try {
|
||||||
|
DataMover dataMover = new DataMover(cluster, dataNodeIndex,
|
||||||
|
sourceDiskIndex, conf, blockSize, blockCount);
|
||||||
|
dataMover.moveDataToSourceDisk();
|
||||||
|
NodePlan plan = dataMover.generatePlan();
|
||||||
|
dataMover.executePlan(plan);
|
||||||
|
dataMover.verifyPlanExectionDone();
|
||||||
|
dataMover.verifyAllVolumesHaveData();
|
||||||
|
dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBalanceDataBetweenMultiplePairsOfVolumes()
|
public void testBalanceDataBetweenMultiplePairsOfVolumes()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -599,9 +656,9 @@ public class TestDiskBalancer {
|
||||||
try (FsDatasetSpi.FsVolumeReferences refs =
|
try (FsDatasetSpi.FsVolumeReferences refs =
|
||||||
node.getFSDataset().getFsVolumeReferences()) {
|
node.getFSDataset().getFsVolumeReferences()) {
|
||||||
for (FsVolumeSpi volume : refs) {
|
for (FsVolumeSpi volume : refs) {
|
||||||
assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
|
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0);
|
||||||
LOG.info(refs.toString() + " : Block Count : {}",
|
LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil
|
||||||
DiskBalancerTestUtil.getBlockCount(volume));
|
.getBlockCount(volume, true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -618,12 +675,11 @@ public class TestDiskBalancer {
|
||||||
try (FsDatasetSpi.FsVolumeReferences refs =
|
try (FsDatasetSpi.FsVolumeReferences refs =
|
||||||
node.getFSDataset().getFsVolumeReferences()) {
|
node.getFSDataset().getFsVolumeReferences()) {
|
||||||
volume = (FsVolumeImpl) refs.get(sourceDiskIndex);
|
volume = (FsVolumeImpl) refs.get(sourceDiskIndex);
|
||||||
assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0);
|
assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0);
|
||||||
|
|
||||||
assertTrue(
|
assertTrue((DiskBalancerTestUtil.getBlockCount(volume, true) *
|
||||||
(DiskBalancerTestUtil.getBlockCount(volume) *
|
(blockSize + delta)) >= plan.getVolumeSetPlans().get(0)
|
||||||
(blockSize + delta)) >=
|
.getBytesToMove());
|
||||||
plan.getVolumeSetPlans().get(0).getBytesToMove());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ import java.util.Random;
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test DiskBalancer RPC.
|
* Test DiskBalancer RPC.
|
||||||
|
@ -265,7 +265,7 @@ public class TestDiskBalancerRPC {
|
||||||
dest = (FsVolumeImpl) refs.get(1);
|
dest = (FsVolumeImpl) refs.get(1);
|
||||||
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
|
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
|
||||||
source, dest);
|
source, dest);
|
||||||
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
|
assertEquals(0, DiskBalancerTestUtil.getBlockCount(source, false));
|
||||||
} finally {
|
} finally {
|
||||||
refs.close();
|
refs.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue