HDFS-13439. Add test case for read block operation when it is moved. Contributed by Ajay Kumar.

This commit is contained in:
Arpit Agarwal 2018-04-16 14:16:59 -07:00
parent 805e33b62c
commit 43a2af5569
3 changed files with 111 additions and 4 deletions

View File

@ -970,7 +970,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @return newReplicaInfo
* @throws IOException
*/
private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
@VisibleForTesting
ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
volumeRef);

View File

@ -87,6 +87,11 @@ public class BlockReaderTestUtil {
this(replicationFactor, new HdfsConfiguration());
}
public BlockReaderTestUtil(MiniDFSCluster cluster, HdfsConfiguration conf) {
this.conf = conf;
this.cluster = cluster;
}
public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception {
this.conf = config;
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);

View File

@ -20,17 +20,23 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -83,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOUR
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
@ -832,8 +840,21 @@ public class TestFsDatasetImpl {
private ReplicaInfo createNewReplicaObj(ExtendedBlock block, FsDatasetImpl
fsDataSetImpl) throws IOException {
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
FsVolumeSpi destVolume = null;
FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo,
destVolume.obtainReference());
}
/**
* Finds a new destination volume for block.
*
* @param block - Extended Block
* @param fsDataSetImpl - FsDatasetImpl reference
* @throws IOException
*/
private FsVolumeSpi getDestinationVolume(ExtendedBlock block, FsDatasetImpl
fsDataSetImpl) throws IOException {
FsVolumeSpi destVolume = null;
final String srcStorageId = fsDataSetImpl.getVolume(block).getStorageID();
try (FsVolumeReferences volumeReferences =
fsDataSetImpl.getFsVolumeReferences()) {
@ -844,8 +865,88 @@ public class TestFsDatasetImpl {
}
}
}
return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo,
destVolume.obtainReference());
return destVolume;
}
@Test(timeout = 3000000)
public void testBlockReadOpWhileMovingBlock() throws IOException {
MiniDFSCluster cluster = null;
try {
// Setup cluster
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
// Create test file with ASCII data
Path filePath = new Path("/tmp/testData");
String blockData = RandomStringUtils.randomAscii(512 * 4);
FSDataOutputStream fout = fs.create(filePath);
fout.writeBytes(blockData);
fout.close();
assertEquals(blockData, DFSTestUtil.readFile(fs, filePath));
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
BlockReaderTestUtil util = new BlockReaderTestUtil(cluster, new
HdfsConfiguration(conf));
LocatedBlock blk = util.getFileBlocks(filePath, 512 * 2).get(0);
File[] blkFiles = cluster.getAllBlockFiles(block);
// Part 1: Read partial data from block
LOG.info("Reading partial data for block {} before moving it: ",
blk.getBlock().toString());
BlockReader blkReader = BlockReaderTestUtil.getBlockReader(
(DistributedFileSystem) fs, blk, 0, 512 * 2);
byte[] buf = new byte[512 * 2];
blkReader.read(buf, 0, 512);
assertEquals(blockData.substring(0, 512), new String(buf,
StandardCharsets.US_ASCII).substring(0, 512));
// Part 2: Move block and than read remaining block
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
assertNotNull("Destination volume should not be null.", destVolume);
fsDataSetImpl.moveBlock(block, replicaInfo, destVolume.obtainReference());
// Trigger block report to update block info in NN
cluster.triggerBlockReports();
blkReader.read(buf, 512, 512);
assertEquals(blockData.substring(0, 512 * 2), new String(buf,
StandardCharsets.US_ASCII).substring(0, 512 * 2));
blkReader = BlockReaderTestUtil.getBlockReader(
(DistributedFileSystem) fs,
blk, 0, blockData.length());
buf = new byte[512 * 4];
blkReader.read(buf, 0, 512 * 4);
assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII));
// Part 3: 1. Close the block reader
// 2. Assert source block doesn't exist on initial volume
// 3. Assert new file location for block is different
// 4. Confirm client can read data from new location
blkReader.close();
ExtendedBlock block2 = DFSTestUtil.getFirstBlock(fs, filePath);
File[] blkFiles2 = cluster.getAllBlockFiles(block2);
blk = util.getFileBlocks(filePath, 512 * 4).get(0);
blkReader = BlockReaderTestUtil.getBlockReader(
(DistributedFileSystem) fs,
blk, 0, blockData.length());
blkReader.read(buf, 0, 512 * 4);
assertFalse(Files.exists(Paths.get(blkFiles[0].getAbsolutePath())));
assertNotEquals(blkFiles[0], blkFiles2[0]);
assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII));
} finally {
if (cluster.isClusterUp()) {
cluster.shutdown();
}
}
}
}