HDFS-9735. DiskBalancer : Refactor moveBlockAcrossStorage to be used by disk balancer. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-04-11 15:58:06 -07:00 committed by Arpit Agarwal
parent 050677077b
commit 7820737cfa
7 changed files with 210 additions and 39 deletions

View File

@ -628,4 +628,15 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
* Confirm whether the block is deleting
*/
boolean isDeletingBlock(String bpid, long blockId);
/**
* Moves a given block from one volume to another volume. This is used by disk
* balancer.
*
* @param block - ExtendedBlock
* @param destination - Destination volume
* @return Old replica info
*/
ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
FsVolumeSpi destination) throws IOException;
}

View File

@ -947,29 +947,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
}
try {
File oldBlockFile = replicaInfo.getBlockFile();
File oldMetaFile = replicaInfo.getMetaFile();
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Copy files to temp dir first
File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), oldMetaFile, oldBlockFile,
targetVolume.getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
targetVolume, blockFiles[0].getParentFile(), 0);
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
synchronized (this) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
}
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
moveBlock(block, replicaInfo, volumeRef);
} finally {
if (volumeRef != null) {
volumeRef.close();
@ -980,6 +958,77 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
return replicaInfo;
}
/**
* Moves a block from a given volume to another.
*
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @return newReplicaInfo
* @throws IOException
*/
private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws
IOException {
File oldBlockFile = replicaInfo.getBlockFile();
File oldMetaFile = replicaInfo.getMetaFile();
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Copy files to temp dir first
File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), oldMetaFile, oldBlockFile,
targetVolume.getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
targetVolume, blockFiles[0].getParentFile(), 0);
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
synchronized (this) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
}
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
return newReplicaInfo;
}
/**
* Moves a given block from one volume to another volume. This is used by disk
* balancer.
*
* @param block - ExtendedBlock
* @param destination - Destination volume
* @return Old replica info
*/
@Override
public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi
destination) throws IOException {
ReplicaInfo replicaInfo = getReplicaInfo(block);
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
}
FsVolumeReference volumeRef = null;
synchronized (this) {
volumeRef = destination.obtainReference();
}
try {
moveBlock(block, replicaInfo, volumeRef);
} finally {
if (volumeRef != null) {
volumeRef.close();
}
}
return replicaInfo;
}
/**
* Compute and store the checksum for a block file that does not already have
* its checksum computed.

View File

@ -711,6 +711,13 @@ public ExtendedBlock nextBlock() throws IOException {
actualBlockDir.getPath());
continue;
}
File blkFile = getBlockFile(bpid, block);
File metaFile = FsDatasetUtil.findMetaFile(blkFile);
block.setGenerationStamp(
Block.getGenerationStamp(metaFile.getName()));
block.setNumBytes(blkFile.length());
LOG.trace("nextBlock({}, {}): advancing to {}",
storageID, bpid, block);
return block;
@ -732,6 +739,12 @@ public ExtendedBlock nextBlock() throws IOException {
}
}
private File getBlockFile(String bpid, ExtendedBlock blk)
throws IOException {
return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(bpid),
blk.getBlockId()).toString() + "/" + blk.getBlockName());
}
@Override
public boolean atEnd() {
return state.atEnd;

View File

@ -1359,5 +1359,12 @@ public boolean getPinning(ExtendedBlock b) throws IOException {
public boolean isDeletingBlock(String bpid, long blockId) {
throw new UnsupportedOperationException();
}
@Override
public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block,
FsVolumeSpi destination) throws IOException {
return null;
}
}

View File

@ -442,4 +442,12 @@ public boolean getPinning(ExtendedBlock block) throws IOException {
public boolean isDeletingBlock(String bpid, long blockId) {
return false;
}
@Override
public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block,
FsVolumeSpi destination)
throws IOException {
return null;
}
}

View File

@ -19,6 +19,9 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
@ -26,6 +29,7 @@
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
@ -53,7 +57,6 @@ public DiskBalancerTestUtil() {
* Returns a random string.
*
* @param length - Number of chars in the string
*
* @return random String
*/
private String getRandomName(int length) {
@ -122,7 +125,6 @@ public DiskBalancerVolume createRandomVolume() {
* Creates a Random Volume for testing purpose.
*
* @param type - StorageType
*
* @return DiskBalancerVolume
*/
public DiskBalancerVolume createRandomVolume(StorageType type) {
@ -142,11 +144,9 @@ public DiskBalancerVolume createRandomVolume(StorageType type) {
/**
* Creates a RandomVolumeSet.
*
* @param type -Storage Type
* @param type - Storage Type
* @param diskCount - How many disks you need.
*
* @return volumeSet
*
* @throws Exception
*/
public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type,
@ -168,9 +168,7 @@ public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type,
*
* @param diskTypes - Storage types needed in the Node
* @param diskCount - Disk count - that many disks of each type is created
*
* @return DataNode
*
* @throws Exception
*/
public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes,
@ -195,11 +193,9 @@ public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes,
* Creates a RandomCluster.
*
* @param dataNodeCount - How many nodes you need
* @param diskTypes - StorageTypes you need in each node
* @param diskCount - How many disks you need of each type.
*
* @param diskTypes - StorageTypes you need in each node
* @param diskCount - How many disks you need of each type.
* @return Cluster
*
* @throws Exception
*/
public DiskBalancerCluster createRandCluster(int dataNodeCount,
@ -224,4 +220,48 @@ public DiskBalancerCluster createRandCluster(int dataNodeCount,
return cluster;
}
/**
* Returns the number of blocks on a volume.
*
* @param source - Source Volume.
* @return Number of Blocks.
* @throws IOException
*/
public static int getBlockCount(FsVolumeSpi source) throws IOException {
int count = 0;
for (String blockPoolID : source.getBlockPoolList()) {
FsVolumeSpi.BlockIterator sourceIter =
source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
while (!sourceIter.atEnd()) {
ExtendedBlock block = sourceIter.nextBlock();
if (block != null) {
count++;
}
}
}
return count;
}
/**
* Moves all blocks to the destination volume.
*
* @param fsDataset - Dataset
* @param source - Source Volume.
* @param dest - Destination Volume.
* @throws IOException
*/
public static void moveAllDataToDestVolume(FsDatasetSpi fsDataset,
FsVolumeSpi source, FsVolumeSpi dest) throws IOException {
for (String blockPoolID : source.getBlockPoolList()) {
FsVolumeSpi.BlockIterator sourceIter =
source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
while (!sourceIter.atEnd()) {
ExtendedBlock block = sourceIter.nextBlock();
if (block != null) {
fsDataset.moveBlockAcrossVolumes(block, dest);
}
}
}
}
}

View File

@ -19,35 +19,39 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.Result;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.hamcrest.*;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.codehaus.jackson.map.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
import static org.junit.Assert.assertTrue;
public class TestDiskBalancerRPC {
@Rule
@ -227,6 +231,45 @@ public void testQueryPlanWithoutSubmit() throws Exception {
Assert.assertTrue(status.getResult() == NO_PLAN);
}
@Test
public void testMoveBlockAcrossVolume() throws Exception {
Configuration conf = new HdfsConfiguration();
final int DEFAULT_BLOCK_SIZE = 100;
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
String fileName = "/tmp.txt";
Path filePath = new Path(fileName);
final int numDatanodes = 1;
final int dnIndex = 0;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).build();
FsVolumeImpl source = null;
FsVolumeImpl dest = null;
try {
cluster.waitActive();
Random r = new Random();
FileSystem fs = cluster.getFileSystem(dnIndex);
DFSTestUtil.createFile(fs, filePath, 10 * 1024,
(short) 1, r.nextLong());
DataNode dnNode = cluster.getDataNodes().get(dnIndex);
FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences();
try {
source = (FsVolumeImpl) refs.get(0);
dest = (FsVolumeImpl) refs.get(1);
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
source, dest);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
} finally {
refs.close();
}
} finally {
cluster.shutdown();
}
}
private class RpcTestHelper {
private NodePlan plan;
private int planVersion;