HDFS-15549. Use Hardlink to move replica between DISK and ARCHIVE storage if on same filesystem mount (#2583). Contributed by Leon Gao.
This commit is contained in:
parent
3d46141583
commit
7743d40ac5
|
@ -153,7 +153,7 @@ public class HardLink {
|
|||
*/
|
||||
|
||||
/**
|
||||
* Creates a hardlink
|
||||
* Creates a hardlink.
|
||||
* @param file - existing source file
|
||||
* @param linkName - desired target link file
|
||||
*/
|
||||
|
|
|
@ -92,6 +92,11 @@ public enum StorageType {
|
|||
return StorageType.valueOf(StringUtils.toUpperCase(s));
|
||||
}
|
||||
|
||||
public static boolean allowSameDiskTiering(StorageType storageType) {
|
||||
return storageType == StorageType.DISK
|
||||
|| storageType == StorageType.ARCHIVE;
|
||||
}
|
||||
|
||||
private static List<StorageType> getNonTransientTypes() {
|
||||
List<StorageType> nonTransientTypes = new ArrayList<>();
|
||||
for (StorageType t : VALUES) {
|
||||
|
|
|
@ -322,7 +322,8 @@ public class DirectoryScanner implements Runnable {
|
|||
* Start the scanner. The scanner will run every
|
||||
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
|
||||
*/
|
||||
void start() {
|
||||
@VisibleForTesting
|
||||
public void start() {
|
||||
shouldRun.set(true);
|
||||
long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ import javax.management.NotCompliantMBeanException;
|
|||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import org.apache.hadoop.fs.HardLink;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
|
@ -994,6 +995,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
smallBufferSize, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Link the block and meta files for the given block to the given destination.
|
||||
* @return the new meta and block files.
|
||||
* @throws IOException
|
||||
*/
|
||||
static File[] hardLinkBlockFiles(long blockId, long genStamp,
|
||||
ReplicaInfo srcReplica, File destRoot) throws IOException {
|
||||
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
|
||||
// blockName is same as the filename for the block
|
||||
final File dstFile = new File(destDir, srcReplica.getBlockName());
|
||||
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
|
||||
return hardLinkBlockFiles(srcReplica, dstMeta, dstFile);
|
||||
}
|
||||
|
||||
static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
|
||||
File dstFile, boolean calculateChecksum,
|
||||
int smallBufferSize, final Configuration conf)
|
||||
|
@ -1026,6 +1041,34 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return new File[] {dstMeta, dstFile};
|
||||
}
|
||||
|
||||
static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta,
|
||||
File dstFile)
|
||||
throws IOException {
|
||||
// Create parent folder if not exists.
|
||||
srcReplica.getFileIoProvider()
|
||||
.mkdirs(srcReplica.getVolume(), dstFile.getParentFile());
|
||||
try {
|
||||
HardLink.createHardLink(
|
||||
new File(srcReplica.getBlockURI()), dstFile);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to hardLink "
|
||||
+ srcReplica + " block file to "
|
||||
+ dstFile, e);
|
||||
}
|
||||
try {
|
||||
HardLink.createHardLink(
|
||||
new File(srcReplica.getMetadataURI()), dstMeta);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to hardLink "
|
||||
+ srcReplica + " metadata to "
|
||||
+ dstMeta, e);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("Linked " + srcReplica.getBlockURI() + " to " + dstFile);
|
||||
}
|
||||
return new File[]{dstMeta, dstFile};
|
||||
}
|
||||
|
||||
/**
|
||||
* Move block files from one storage to another storage.
|
||||
* @return Returns the Old replicaInfo
|
||||
|
@ -1058,12 +1101,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
FsVolumeReference volumeRef = null;
|
||||
boolean shouldConsiderSameMountVolume =
|
||||
shouldConsiderSameMountVolume(replicaInfo.getVolume(),
|
||||
targetStorageType, targetStorageId);
|
||||
boolean useVolumeOnSameMount = false;
|
||||
|
||||
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
||||
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
|
||||
if (shouldConsiderSameMountVolume) {
|
||||
volumeRef = volumes.getVolumeByMount(targetStorageType,
|
||||
((FsVolumeImpl) replicaInfo.getVolume()).getMount(),
|
||||
block.getNumBytes());
|
||||
if (volumeRef != null) {
|
||||
useVolumeOnSameMount = true;
|
||||
}
|
||||
}
|
||||
if (!useVolumeOnSameMount) {
|
||||
volumeRef = volumes.getNextVolume(
|
||||
targetStorageType,
|
||||
targetStorageId,
|
||||
block.getNumBytes()
|
||||
);
|
||||
}
|
||||
}
|
||||
try {
|
||||
moveBlock(block, replicaInfo, volumeRef);
|
||||
moveBlock(block, replicaInfo, volumeRef, useVolumeOnSameMount);
|
||||
} finally {
|
||||
if (volumeRef != null) {
|
||||
volumeRef.close();
|
||||
|
@ -1074,20 +1135,54 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return replicaInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* When configuring DISK/ARCHIVE on same volume,
|
||||
* check if we should find the counterpart on the same disk mount.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
boolean shouldConsiderSameMountVolume(FsVolumeSpi fsVolume,
|
||||
StorageType targetStorageType, String targetStorageID) {
|
||||
if (targetStorageID != null && !targetStorageID.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (!(fsVolume instanceof FsVolumeImpl)
|
||||
|| ((FsVolumeImpl) fsVolume).getMount().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
StorageType sourceStorageType = fsVolume.getStorageType();
|
||||
// Source/dest storage types are different
|
||||
if (sourceStorageType == targetStorageType) {
|
||||
return false;
|
||||
}
|
||||
// Source/dest storage types are either DISK or ARCHIVE.
|
||||
return StorageType.allowSameDiskTiering(sourceStorageType)
|
||||
&& StorageType.allowSameDiskTiering(targetStorageType);
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves a block from a given volume to another.
|
||||
*
|
||||
* @param block - Extended Block
|
||||
* @param replicaInfo - ReplicaInfo
|
||||
* @param volumeRef - Volume Ref - Closed by caller.
|
||||
* @param moveBlockToLocalMount - Whether we use shortcut
|
||||
* to move block on same mount.
|
||||
* @return newReplicaInfo
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
|
||||
FsVolumeReference volumeRef) throws IOException {
|
||||
ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
|
||||
FsVolumeReference volumeRef, boolean moveBlockToLocalMount)
|
||||
throws IOException {
|
||||
ReplicaInfo newReplicaInfo;
|
||||
if (moveBlockToLocalMount) {
|
||||
newReplicaInfo = moveReplicaToVolumeOnSameMount(block, replicaInfo,
|
||||
volumeRef);
|
||||
} else {
|
||||
newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
|
||||
volumeRef);
|
||||
}
|
||||
|
||||
finalizeNewReplica(newReplicaInfo, block);
|
||||
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
|
||||
return newReplicaInfo;
|
||||
|
@ -1128,6 +1223,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shortcut to use hardlink to move blocks on same mount.
|
||||
* This is useful when moving blocks between storage types on same disk mount.
|
||||
* Two cases need to be considered carefully:
|
||||
* 1) Datanode restart in the middle should not cause data loss.
|
||||
* We use hardlink to avoid this.
|
||||
* 2) Finalized blocks can be reopened to append.
|
||||
* This is already handled by dataset lock and gen stamp.
|
||||
* See HDFS-12942
|
||||
*
|
||||
* @param block - Extended Block
|
||||
* @param replicaInfo - ReplicaInfo
|
||||
* @param volumeRef - Volume Ref - Closed by caller.
|
||||
* @return newReplicaInfo new replica object created in specified volume.
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
ReplicaInfo moveReplicaToVolumeOnSameMount(ExtendedBlock block,
|
||||
ReplicaInfo replicaInfo,
|
||||
FsVolumeReference volumeRef) throws IOException {
|
||||
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
|
||||
// Move files to temp dir first
|
||||
ReplicaInfo newReplicaInfo = targetVolume.hardLinkBlockToTmpLocation(block,
|
||||
replicaInfo);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalizes newReplica by calling finalizeReplica internally.
|
||||
*
|
||||
|
@ -1177,7 +1299,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
try {
|
||||
moveBlock(block, replicaInfo, volumeRef);
|
||||
moveBlock(block, replicaInfo, volumeRef, false);
|
||||
} finally {
|
||||
if (volumeRef != null) {
|
||||
volumeRef.close();
|
||||
|
|
|
@ -484,9 +484,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
// should share the same amount of reserved capacity.
|
||||
// When calculating actual non dfs used,
|
||||
// exclude DFS used capacity by another volume.
|
||||
if (enableSameDiskTiering &&
|
||||
(storageType == StorageType.DISK
|
||||
|| storageType == StorageType.ARCHIVE)) {
|
||||
if (enableSameDiskTiering
|
||||
&& StorageType.allowSameDiskTiering(storageType)) {
|
||||
StorageType counterpartStorageType = storageType == StorageType.DISK
|
||||
? StorageType.ARCHIVE : StorageType.DISK;
|
||||
FsVolumeReference counterpartRef = dataset
|
||||
|
@ -1529,6 +1528,24 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
public ReplicaInfo hardLinkBlockToTmpLocation(ExtendedBlock block,
|
||||
ReplicaInfo replicaInfo) throws IOException {
|
||||
|
||||
File[] blockFiles = FsDatasetImpl.hardLinkBlockFiles(block.getBlockId(),
|
||||
block.getGenerationStamp(), replicaInfo,
|
||||
getTmpDir(block.getBlockPoolId()));
|
||||
|
||||
ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
|
||||
.setBlockId(replicaInfo.getBlockId())
|
||||
.setGenerationStamp(replicaInfo.getGenerationStamp())
|
||||
.setFsVolume(this)
|
||||
.setDirectoryToUse(blockFiles[0].getParentFile())
|
||||
.setBytesToReserve(0)
|
||||
.build();
|
||||
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
|
||||
long genStamp,
|
||||
ReplicaInfo replicaInfo,
|
||||
|
|
|
@ -111,6 +111,30 @@ class FsVolumeList {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get volume by disk mount to place a block.
|
||||
* This is useful for same disk tiering.
|
||||
*
|
||||
* @param storageType The desired {@link StorageType}
|
||||
* @param mount Disk mount of the volume
|
||||
* @param blockSize Free space needed on the volume
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
FsVolumeReference getVolumeByMount(StorageType storageType,
|
||||
String mount, long blockSize) throws IOException {
|
||||
if (!enableSameDiskTiering) {
|
||||
return null;
|
||||
}
|
||||
FsVolumeReference volume = mountVolumeMap
|
||||
.getVolumeRefByMountAndStorageType(mount, storageType);
|
||||
// Check if volume has enough capacity
|
||||
if (volume != null && volume.getVolume().getAvailable() > blockSize) {
|
||||
return volume;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next volume.
|
||||
*
|
||||
|
@ -354,9 +378,8 @@ class FsVolumeList {
|
|||
* Check if same disk tiering is applied to the volume.
|
||||
*/
|
||||
private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
|
||||
return enableSameDiskTiering &&
|
||||
(target.getStorageType() == StorageType.DISK
|
||||
|| target.getStorageType() == StorageType.ARCHIVE);
|
||||
return enableSameDiskTiering
|
||||
&& StorageType.allowSameDiskTiering(target.getStorageType());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
import java.io.OutputStream;
|
||||
|
@ -68,6 +71,7 @@ import org.apache.hadoop.io.MultipleIOException;
|
|||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Assert;
|
||||
|
@ -1070,24 +1074,43 @@ public class TestFsDatasetImpl {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When moving blocks using hardLink or copy
|
||||
* and append happened in the middle,
|
||||
* block movement should fail and hardlink is removed.
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testMoveBlockFailure() {
|
||||
// Test copy
|
||||
testMoveBlockFailure(conf);
|
||||
// Test hardlink
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
|
||||
testMoveBlockFailure(conf);
|
||||
}
|
||||
|
||||
private void testMoveBlockFailure(Configuration config) {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
|
||||
.storageTypes(
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
|
||||
.storagesPerDatanode(2)
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
Path filePath = new Path("testData");
|
||||
DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
long fileLen = 100;
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
|
||||
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||
ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl);
|
||||
ReplicaInfo newReplicaInfo =
|
||||
createNewReplicaObjWithLink(block, fsDataSetImpl);
|
||||
|
||||
// Append to file to update its GS
|
||||
FSDataOutputStream out = fs.append(filePath, (short) 1);
|
||||
|
@ -1095,6 +1118,7 @@ public class TestFsDatasetImpl {
|
|||
out.hflush();
|
||||
|
||||
// Call finalizeNewReplica
|
||||
assertTrue(newReplicaInfo.blockDataExists());
|
||||
LOG.info("GenerationStamp of old replica: {}",
|
||||
block.getGenerationStamp());
|
||||
LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl
|
||||
|
@ -1103,6 +1127,9 @@ public class TestFsDatasetImpl {
|
|||
LambdaTestUtils.intercept(IOException.class, "Generation Stamp "
|
||||
+ "should be monotonically increased.",
|
||||
() -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block));
|
||||
assertFalse(newReplicaInfo.blockDataExists());
|
||||
|
||||
validateFileLen(fs, fileLen, filePath);
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Exception in testMoveBlockFailure ", ex);
|
||||
fail("Exception while testing testMoveBlockFailure ");
|
||||
|
@ -1143,6 +1170,253 @@ public class TestFsDatasetImpl {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure datanode restart can clean up un-finalized links,
|
||||
* if the block is not finalized yet.
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testDnRestartWithHardLinkInTmp() {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageTypes(
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
|
||||
.storagesPerDatanode(2)
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
Path filePath = new Path("testData");
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
|
||||
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||
|
||||
ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
|
||||
ReplicaInfo newReplicaInfo =
|
||||
createNewReplicaObjWithLink(block, fsDataSetImpl);
|
||||
|
||||
// Link exists
|
||||
assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
|
||||
|
||||
cluster.restartDataNode(0);
|
||||
cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000);
|
||||
cluster.triggerBlockReports();
|
||||
|
||||
// Un-finalized replica data (hard link) is deleted as they were in /tmp
|
||||
assertFalse(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
|
||||
|
||||
// Old block is there.
|
||||
assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI())));
|
||||
|
||||
validateFileLen(fs, fileLen, filePath);
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Exception in testDnRestartWithHardLinkInTmp ", ex);
|
||||
fail("Exception while testing testDnRestartWithHardLinkInTmp ");
|
||||
} finally {
|
||||
if (cluster.isClusterUp()) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If new block is finalized and DN restarted,
|
||||
* DiskScanner should clean up the hardlink correctly.
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testDnRestartWithHardLink() {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageTypes(
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
|
||||
.storagesPerDatanode(2)
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
Path filePath = new Path("testData");
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
|
||||
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||
|
||||
final ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
|
||||
|
||||
fsDataSetImpl.finalizeNewReplica(
|
||||
createNewReplicaObjWithLink(block, fsDataSetImpl), block);
|
||||
|
||||
ReplicaInfo newReplicaInfo = fsDataSetImpl.getReplicaInfo(block);
|
||||
|
||||
cluster.restartDataNode(0);
|
||||
cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000);
|
||||
cluster.triggerBlockReports();
|
||||
|
||||
assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
|
||||
assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI())));
|
||||
|
||||
DirectoryScanner scanner = new DirectoryScanner(
|
||||
cluster.getDataNodes().get(0).getFSDataset(), conf);
|
||||
scanner.start();
|
||||
scanner.run();
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override public Boolean get() {
|
||||
return !Files.exists(Paths.get(oldReplicaInfo.getBlockURI()));
|
||||
}
|
||||
}, 100, 10000);
|
||||
assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI())));
|
||||
|
||||
validateFileLen(fs, fileLen, filePath);
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Exception in testDnRestartWithHardLink ", ex);
|
||||
fail("Exception while testing testDnRestartWithHardLink ");
|
||||
} finally {
|
||||
if (cluster.isClusterUp()) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMoveBlockSuccessWithSameMountMove() {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageTypes(
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
|
||||
.storagesPerDatanode(2)
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
Path filePath = new Path("testData");
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
|
||||
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||
assertEquals(StorageType.DISK,
|
||||
fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
|
||||
|
||||
FsDatasetImpl fsDataSetImplSpy =
|
||||
spy((FsDatasetImpl) dataNode.getFSDataset());
|
||||
fsDataSetImplSpy.moveBlockAcrossStorage(
|
||||
block, StorageType.ARCHIVE, null);
|
||||
|
||||
// Make sure it is done thru hardlink
|
||||
verify(fsDataSetImplSpy).moveBlock(any(), any(), any(), eq(true));
|
||||
|
||||
assertEquals(StorageType.ARCHIVE,
|
||||
fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
|
||||
validateFileLen(fs, fileLen, filePath);
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Exception in testMoveBlockSuccessWithSameMountMove ", ex);
|
||||
fail("testMoveBlockSuccessWithSameMountMove operation should succeed");
|
||||
} finally {
|
||||
if (cluster.isClusterUp()) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Move should fail if the volume on same mount has no space.
|
||||
@Test(timeout = 30000)
|
||||
public void testMoveBlockWithSameMountMoveWithoutSpace() {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.0);
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageTypes(
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
|
||||
.storagesPerDatanode(2)
|
||||
.build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
Path filePath = new Path("testData");
|
||||
long fileLen = 100;
|
||||
|
||||
ExtendedBlock block = createTestFile(fs, fileLen, filePath);
|
||||
|
||||
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||
assertEquals(StorageType.DISK,
|
||||
fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType());
|
||||
|
||||
FsDatasetImpl fsDataSetImplSpy =
|
||||
spy((FsDatasetImpl) dataNode.getFSDataset());
|
||||
fsDataSetImplSpy.moveBlockAcrossStorage(
|
||||
block, StorageType.ARCHIVE, null);
|
||||
|
||||
fail("testMoveBlockWithSameMountMoveWithoutSpace operation" +
|
||||
" should failed");
|
||||
} catch (Exception ex) {
|
||||
assertTrue(ex instanceof DiskChecker.DiskOutOfSpaceException);
|
||||
} finally {
|
||||
if (cluster.isClusterUp()) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// More tests on shouldConsiderSameMountVolume.
|
||||
@Test(timeout = 10000)
|
||||
public void testShouldConsiderSameMountVolume() throws IOException {
|
||||
FsVolumeImpl volume = new FsVolumeImplBuilder()
|
||||
.setConf(conf)
|
||||
.setDataset(dataset)
|
||||
.setStorageID("storage-id")
|
||||
.setStorageDirectory(
|
||||
new StorageDirectory(StorageLocation.parse(BASE_DIR)))
|
||||
.build();
|
||||
assertFalse(dataset.shouldConsiderSameMountVolume(volume,
|
||||
StorageType.ARCHIVE, null));
|
||||
|
||||
conf.setBoolean(DFSConfigKeys
|
||||
.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
0.5);
|
||||
volume = new FsVolumeImplBuilder()
|
||||
.setConf(conf)
|
||||
.setDataset(dataset)
|
||||
.setStorageID("storage-id")
|
||||
.setStorageDirectory(
|
||||
new StorageDirectory(StorageLocation.parse(BASE_DIR)))
|
||||
.build();
|
||||
assertTrue(dataset.shouldConsiderSameMountVolume(volume,
|
||||
StorageType.ARCHIVE, null));
|
||||
assertTrue(dataset.shouldConsiderSameMountVolume(volume,
|
||||
StorageType.ARCHIVE, ""));
|
||||
assertFalse(dataset.shouldConsiderSameMountVolume(volume,
|
||||
StorageType.DISK, null));
|
||||
assertFalse(dataset.shouldConsiderSameMountVolume(volume,
|
||||
StorageType.ARCHIVE, "target"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new temporary replica of replicaInfo object in another volume.
|
||||
*
|
||||
|
@ -1158,6 +1432,38 @@ public class TestFsDatasetImpl {
|
|||
destVolume.obtainReference());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new temporary replica of replicaInfo object in another volume.
|
||||
*
|
||||
* @param block - Extended Block
|
||||
* @param fsDataSetImpl - FsDatasetImpl reference
|
||||
* @throws IOException
|
||||
*/
|
||||
private ReplicaInfo createNewReplicaObjWithLink(ExtendedBlock block,
|
||||
FsDatasetImpl fsDataSetImpl) throws IOException {
|
||||
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
|
||||
FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
|
||||
return fsDataSetImpl.moveReplicaToVolumeOnSameMount(block, replicaInfo,
|
||||
destVolume.obtainReference());
|
||||
}
|
||||
|
||||
private ExtendedBlock createTestFile(FileSystem fs,
|
||||
long fileLen, Path filePath) throws IOException {
|
||||
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, 0);
|
||||
return DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
}
|
||||
|
||||
private void validateFileLen(FileSystem fs,
|
||||
long fileLen, Path filePath) throws IOException {
|
||||
// Read data file to make sure it is good.
|
||||
InputStream in = fs.open(filePath);
|
||||
int bytesCount = 0;
|
||||
while (in.read() != -1) {
|
||||
bytesCount++;
|
||||
}
|
||||
assertTrue(fileLen <= bytesCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds a new destination volume for block.
|
||||
*
|
||||
|
@ -1225,7 +1531,8 @@ public class TestFsDatasetImpl {
|
|||
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
|
||||
FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
|
||||
assertNotNull("Destination volume should not be null.", destVolume);
|
||||
fsDataSetImpl.moveBlock(block, replicaInfo, destVolume.obtainReference());
|
||||
fsDataSetImpl.moveBlock(block, replicaInfo,
|
||||
destVolume.obtainReference(), false);
|
||||
// Trigger block report to update block info in NN
|
||||
cluster.triggerBlockReports();
|
||||
blkReader.read(buf, 512, 512);
|
||||
|
|
|
@ -446,6 +446,12 @@ public class TestMover {
|
|||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
testWithinSameNode(conf);
|
||||
// Test movement with hardlink, when same disk tiering is enabled.
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
|
||||
testWithinSameNode(conf);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, false);
|
||||
}
|
||||
|
||||
private void checkMovePaths(List<Path> actual, Path... expected) {
|
||||
|
|
Loading…
Reference in New Issue