diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c1412930a85..e253e07e0d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -970,7 +970,8 @@ class FsDatasetImpl implements FsDatasetSpi { * @return newReplicaInfo * @throws IOException */ - private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, + @VisibleForTesting + ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, FsVolumeReference volumeRef) throws IOException { ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo, volumeRef); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java index c5979210e2b..57f5cf82f2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java @@ -87,6 +87,11 @@ public class BlockReaderTestUtil { this(replicationFactor, new HdfsConfiguration()); } + public BlockReaderTestUtil(MiniDFSCluster cluster, HdfsConfiguration conf) { + this.conf = conf; + this.cluster = cluster; + } + public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception { this.conf = config; conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 3c4b657983a..d684950426f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -20,17 +20,23 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import java.nio.file.Files; +import java.nio.file.Paths; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -83,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOUR import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; @@ -832,8 +840,21 @@ public class TestFsDatasetImpl { private ReplicaInfo createNewReplicaObj(ExtendedBlock block, FsDatasetImpl fsDataSetImpl) throws IOException { ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); - FsVolumeSpi destVolume = null; + FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl); + return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo, + destVolume.obtainReference()); + } + /** + * Finds a new destination volume for block. + * + * @param block - Extended Block + * @param fsDataSetImpl - FsDatasetImpl reference + * @throws IOException + */ + private FsVolumeSpi getDestinationVolume(ExtendedBlock block, FsDatasetImpl + fsDataSetImpl) throws IOException { + FsVolumeSpi destVolume = null; final String srcStorageId = fsDataSetImpl.getVolume(block).getStorageID(); try (FsVolumeReferences volumeReferences = fsDataSetImpl.getFsVolumeReferences()) { @@ -844,8 +865,88 @@ public class TestFsDatasetImpl { } } } - return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo, - destVolume.obtainReference()); + return destVolume; + } + + @Test(timeout = 3000000) + public void testBlockReadOpWhileMovingBlock() throws IOException { + MiniDFSCluster cluster = null; + try { + + // Setup cluster + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) + .storagesPerDatanode(2) + .build(); + FileSystem fs = cluster.getFileSystem(); + DataNode dataNode = cluster.getDataNodes().get(0); + + // Create test file with ASCII data + Path filePath = new Path("/tmp/testData"); + String blockData = RandomStringUtils.randomAscii(512 * 4); + FSDataOutputStream fout = fs.create(filePath); + fout.writeBytes(blockData); + fout.close(); + assertEquals(blockData, DFSTestUtil.readFile(fs, filePath)); + + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); + BlockReaderTestUtil util = new BlockReaderTestUtil(cluster, new + HdfsConfiguration(conf)); + LocatedBlock blk = util.getFileBlocks(filePath, 512 * 2).get(0); + File[] blkFiles = cluster.getAllBlockFiles(block); + + // Part 1: Read partial data from block + LOG.info("Reading partial data for block {} before moving it: ", + blk.getBlock().toString()); + BlockReader blkReader = BlockReaderTestUtil.getBlockReader( + (DistributedFileSystem) fs, blk, 0, 512 * 2); + byte[] buf = new byte[512 * 2]; + blkReader.read(buf, 0, 512); + assertEquals(blockData.substring(0, 512), new String(buf, + StandardCharsets.US_ASCII).substring(0, 512)); + + // Part 2: Move block and than read remaining block + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); + FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl); + assertNotNull("Destination volume should not be null.", destVolume); + fsDataSetImpl.moveBlock(block, replicaInfo, destVolume.obtainReference()); + // Trigger block report to update block info in NN + cluster.triggerBlockReports(); + blkReader.read(buf, 512, 512); + assertEquals(blockData.substring(0, 512 * 2), new String(buf, + StandardCharsets.US_ASCII).substring(0, 512 * 2)); + blkReader = BlockReaderTestUtil.getBlockReader( + (DistributedFileSystem) fs, + blk, 0, blockData.length()); + buf = new byte[512 * 4]; + blkReader.read(buf, 0, 512 * 4); + assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII)); + + // Part 3: 1. Close the block reader + // 2. Assert source block doesn't exist on initial volume + // 3. Assert new file location for block is different + // 4. Confirm client can read data from new location + blkReader.close(); + ExtendedBlock block2 = DFSTestUtil.getFirstBlock(fs, filePath); + File[] blkFiles2 = cluster.getAllBlockFiles(block2); + blk = util.getFileBlocks(filePath, 512 * 4).get(0); + blkReader = BlockReaderTestUtil.getBlockReader( + (DistributedFileSystem) fs, + blk, 0, blockData.length()); + blkReader.read(buf, 0, 512 * 4); + + assertFalse(Files.exists(Paths.get(blkFiles[0].getAbsolutePath()))); + assertNotEquals(blkFiles[0], blkFiles2[0]); + assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII)); + + } finally { + if (cluster.isClusterUp()) { + cluster.shutdown(); + } + } } } \ No newline at end of file