diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 277b271e741..eeab098b6f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -628,4 +628,15 @@ public interface FsDatasetSpi extends FSDatasetMBean { * Confirm whether the block is deleting */ boolean isDeletingBlock(String bpid, long blockId); + + /** + * Moves a given block from one volume to another volume. This is used by disk + * balancer. + * + * @param block - ExtendedBlock + * @param destination - Destination volume + * @return Old replica info + */ + ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, + FsVolumeSpi destination) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index b0422979a7e..2b405384287 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -947,29 +947,7 @@ class FsDatasetImpl implements FsDatasetSpi { volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); } try { - File oldBlockFile = replicaInfo.getBlockFile(); - File oldMetaFile = replicaInfo.getMetaFile(); - FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); - // Copy files to temp dir first - File[] blockFiles = copyBlockFiles(block.getBlockId(), - block.getGenerationStamp(), oldMetaFile, oldBlockFile, - targetVolume.getTmpDir(block.getBlockPoolId()), - replicaInfo.isOnTransientStorage(), smallBufferSize, conf); - - ReplicaInfo newReplicaInfo = new ReplicaInPipeline( - replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), - targetVolume, blockFiles[0].getParentFile(), 0); - newReplicaInfo.setNumBytes(blockFiles[1].length()); - // Finalize the copied files - newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - synchronized (this) { - // Increment numBlocks here as this block moved without knowing to BPS - FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); - volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); - } - - removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile, - oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId()); + moveBlock(block, replicaInfo, volumeRef); } finally { if (volumeRef != null) { volumeRef.close(); @@ -980,6 +958,77 @@ class FsDatasetImpl implements FsDatasetSpi { return replicaInfo; } + /** + * Moves a block from a given volume to another. + * + * @param block - Extended Block + * @param replicaInfo - ReplicaInfo + * @param volumeRef - Volume Ref - Closed by caller. + * @return newReplicaInfo + * @throws IOException + */ + private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, + FsVolumeReference volumeRef) throws + IOException { + File oldBlockFile = replicaInfo.getBlockFile(); + File oldMetaFile = replicaInfo.getMetaFile(); + FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); + // Copy files to temp dir first + File[] blockFiles = copyBlockFiles(block.getBlockId(), + block.getGenerationStamp(), oldMetaFile, oldBlockFile, + targetVolume.getTmpDir(block.getBlockPoolId()), + replicaInfo.isOnTransientStorage(), smallBufferSize, conf); + + ReplicaInfo newReplicaInfo = new ReplicaInPipeline( + replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), + targetVolume, blockFiles[0].getParentFile(), 0); + newReplicaInfo.setNumBytes(blockFiles[1].length()); + // Finalize the copied files + newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); + synchronized (this) { + // Increment numBlocks here as this block moved without knowing to BPS + FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); + volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); + } + + removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile, + oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId()); + return newReplicaInfo; + } + + /** + * Moves a given block from one volume to another volume. This is used by disk + * balancer. + * + * @param block - ExtendedBlock + * @param destination - Destination volume + * @return Old replica info + */ + @Override + public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi + destination) throws IOException { + ReplicaInfo replicaInfo = getReplicaInfo(block); + if (replicaInfo.getState() != ReplicaState.FINALIZED) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.UNFINALIZED_REPLICA + block); + } + + FsVolumeReference volumeRef = null; + + synchronized (this) { + volumeRef = destination.obtainReference(); + } + + try { + moveBlock(block, replicaInfo, volumeRef); + } finally { + if (volumeRef != null) { + volumeRef.close(); + } + } + return replicaInfo; + } + /** * Compute and store the checksum for a block file that does not already have * its checksum computed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 68e2537c39f..4a446d4e954 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -711,6 +711,13 @@ public class FsVolumeImpl implements FsVolumeSpi { actualBlockDir.getPath()); continue; } + + File blkFile = getBlockFile(bpid, block); + File metaFile = FsDatasetUtil.findMetaFile(blkFile); + block.setGenerationStamp( + Block.getGenerationStamp(metaFile.getName())); + block.setNumBytes(blkFile.length()); + LOG.trace("nextBlock({}, {}): advancing to {}", storageID, bpid, block); return block; @@ -732,6 +739,12 @@ public class FsVolumeImpl implements FsVolumeSpi { } } + private File getBlockFile(String bpid, ExtendedBlock blk) + throws IOException { + return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(bpid), + blk.getBlockId()).toString() + "/" + blk.getBlockName()); + } + @Override public boolean atEnd() { return state.atEnd; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 25034c6d8e2..24f4a52038d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1359,5 +1359,12 @@ public class SimulatedFSDataset implements FsDatasetSpi { public boolean isDeletingBlock(String bpid, long blockId) { throw new UnsupportedOperationException(); } + + @Override + public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, + FsVolumeSpi destination) throws IOException { + return null; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index c872e612517..8518ddd2a9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -442,4 +442,12 @@ public class ExternalDatasetImpl implements FsDatasetSpi { public boolean isDeletingBlock(String bpid, long blockId) { return false; } + + @Override + public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, + FsVolumeSpi destination) + throws IOException { + return null; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java index 96139199094..43bb18409d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs.server.diskbalancer; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; @@ -26,6 +29,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet; import org.apache.hadoop.util.Time; +import java.io.IOException; import java.util.Random; import java.util.UUID; @@ -53,7 +57,6 @@ public class DiskBalancerTestUtil { * Returns a random string. * * @param length - Number of chars in the string - * * @return random String */ private String getRandomName(int length) { @@ -122,7 +125,6 @@ public class DiskBalancerTestUtil { * Creates a Random Volume for testing purpose. * * @param type - StorageType - * * @return DiskBalancerVolume */ public DiskBalancerVolume createRandomVolume(StorageType type) { @@ -142,11 +144,9 @@ public class DiskBalancerTestUtil { /** * Creates a RandomVolumeSet. * - * @param type -Storage Type + * @param type - Storage Type * @param diskCount - How many disks you need. - * * @return volumeSet - * * @throws Exception */ public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type, @@ -168,9 +168,7 @@ public class DiskBalancerTestUtil { * * @param diskTypes - Storage types needed in the Node * @param diskCount - Disk count - that many disks of each type is created - * * @return DataNode - * * @throws Exception */ public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes, @@ -195,11 +193,9 @@ public class DiskBalancerTestUtil { * Creates a RandomCluster. * * @param dataNodeCount - How many nodes you need - * @param diskTypes - StorageTypes you need in each node - * @param diskCount - How many disks you need of each type. - * + * @param diskTypes - StorageTypes you need in each node + * @param diskCount - How many disks you need of each type. * @return Cluster - * * @throws Exception */ public DiskBalancerCluster createRandCluster(int dataNodeCount, @@ -224,4 +220,48 @@ public class DiskBalancerTestUtil { return cluster; } + /** + * Returns the number of blocks on a volume. + * + * @param source - Source Volume. + * @return Number of Blocks. + * @throws IOException + */ + public static int getBlockCount(FsVolumeSpi source) throws IOException { + int count = 0; + for (String blockPoolID : source.getBlockPoolList()) { + FsVolumeSpi.BlockIterator sourceIter = + source.newBlockIterator(blockPoolID, "TestDiskBalancerSource"); + while (!sourceIter.atEnd()) { + ExtendedBlock block = sourceIter.nextBlock(); + if (block != null) { + count++; + } + } + } + return count; + } + + /** + * Moves all blocks to the destination volume. + * + * @param fsDataset - Dataset + * @param source - Source Volume. + * @param dest - Destination Volume. + * @throws IOException + */ + public static void moveAllDataToDestVolume(FsDatasetSpi fsDataset, + FsVolumeSpi source, FsVolumeSpi dest) throws IOException { + + for (String blockPoolID : source.getBlockPoolList()) { + FsVolumeSpi.BlockIterator sourceIter = + source.newBlockIterator(blockPoolID, "TestDiskBalancerSource"); + while (!sourceIter.atEnd()) { + ExtendedBlock block = sourceIter.nextBlock(); + if (block != null) { + fsDataset.moveBlockAcrossVolumes(block, dest); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index 27cd8eb763a..81a0609371a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -19,35 +19,39 @@ package org.apache.hadoop.hdfs.server.diskbalancer; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; -import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.Result; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner; -import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; -import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; -import org.hamcrest.*; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.codehaus.jackson.map.ObjectMapper; import java.util.HashMap; import java.util.Map; +import java.util.Random; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS; +import static org.junit.Assert.assertTrue; public class TestDiskBalancerRPC { @Rule @@ -227,6 +231,45 @@ public class TestDiskBalancerRPC { Assert.assertTrue(status.getResult() == NO_PLAN); } + @Test + public void testMoveBlockAcrossVolume() throws Exception { + Configuration conf = new HdfsConfiguration(); + final int DEFAULT_BLOCK_SIZE = 100; + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); + String fileName = "/tmp.txt"; + Path filePath = new Path(fileName); + final int numDatanodes = 1; + final int dnIndex = 0; + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes).build(); + FsVolumeImpl source = null; + FsVolumeImpl dest = null; + try { + cluster.waitActive(); + Random r = new Random(); + FileSystem fs = cluster.getFileSystem(dnIndex); + DFSTestUtil.createFile(fs, filePath, 10 * 1024, + (short) 1, r.nextLong()); + DataNode dnNode = cluster.getDataNodes().get(dnIndex); + FsDatasetSpi.FsVolumeReferences refs = + dnNode.getFSDataset().getFsVolumeReferences(); + try { + source = (FsVolumeImpl) refs.get(0); + dest = (FsVolumeImpl) refs.get(1); + DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), + source, dest); + assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); + } finally { + refs.close(); + } + } finally { + cluster.shutdown(); + } + } + + private class RpcTestHelper { private NodePlan plan; private int planVersion;