HDFS-6978. Directory scanner should correctly reconcile blocks on RAM disk. (Arpit Agarwal)
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
This commit is contained in:
parent
2fdd9f51ce
commit
6cc206b8ad
|
@ -83,6 +83,7 @@ public class DirectoryScanner implements Runnable {
|
|||
long missingBlockFile = 0;
|
||||
long missingMemoryBlocks = 0;
|
||||
long mismatchBlocks = 0;
|
||||
long duplicateBlocks = 0;
|
||||
|
||||
public Stats(String bpid) {
|
||||
this.bpid = bpid;
|
||||
|
@ -440,7 +441,7 @@ public class DirectoryScanner implements Runnable {
|
|||
int d = 0; // index for blockpoolReport
|
||||
int m = 0; // index for memReprot
|
||||
while (m < memReport.length && d < blockpoolReport.length) {
|
||||
Block memBlock = memReport[Math.min(m, memReport.length - 1)];
|
||||
FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)];
|
||||
ScanInfo info = blockpoolReport[Math.min(
|
||||
d, blockpoolReport.length - 1)];
|
||||
if (info.getBlockId() < memBlock.getBlockId()) {
|
||||
|
@ -468,9 +469,23 @@ public class DirectoryScanner implements Runnable {
|
|||
// or block file length is different than expected
|
||||
statsRecord.mismatchBlocks++;
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
} else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
|
||||
// volumeMap record and on-disk files don't match.
|
||||
statsRecord.duplicateBlocks++;
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
}
|
||||
d++;
|
||||
m++;
|
||||
|
||||
if (d < blockpoolReport.length) {
|
||||
// There may be multiple on-disk records for the same block, don't increment
|
||||
// the memory record pointer if so.
|
||||
ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
|
||||
if (nextInfo.getBlockId() != info.blockId) {
|
||||
++m;
|
||||
}
|
||||
} else {
|
||||
++m;
|
||||
}
|
||||
}
|
||||
while (m < memReport.length) {
|
||||
FinalizedReplica current = memReport[m++];
|
||||
|
|
|
@ -269,15 +269,16 @@ class BlockPoolSlice {
|
|||
/**
|
||||
* Save the given replica to persistent storage.
|
||||
*
|
||||
* @param replicaInfo
|
||||
* @return The saved meta and block files, in that order.
|
||||
* @throws IOException
|
||||
*/
|
||||
File[] lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
|
||||
File[] lazyPersistReplica(long blockId, long genStamp,
|
||||
File srcMeta, File srcFile) throws IOException {
|
||||
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
|
||||
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
|
||||
}
|
||||
File targetFiles[] = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir);
|
||||
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
|
||||
blockId, genStamp, srcMeta, srcFile, lazypersistDir);
|
||||
dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
|
||||
return targetFiles;
|
||||
}
|
||||
|
@ -510,7 +511,7 @@ class BlockPoolSlice {
|
|||
* @return the replica that is retained.
|
||||
* @throws IOException
|
||||
*/
|
||||
private ReplicaInfo resolveDuplicateReplicas(
|
||||
ReplicaInfo resolveDuplicateReplicas(
|
||||
final ReplicaInfo replica1, final ReplicaInfo replica2,
|
||||
final ReplicaMap volumeMap) throws IOException {
|
||||
|
||||
|
|
|
@ -703,17 +703,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Copy the block and meta files for the given block from the given
|
||||
* Copy the block and meta files for the given block to the given destination.
|
||||
* @return the new meta and block files.
|
||||
* @throws IOException
|
||||
*/
|
||||
static File[] copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
|
||||
static File[] copyBlockFiles(long blockId, long genStamp,
|
||||
File srcMeta, File srcFile, File destRoot)
|
||||
throws IOException {
|
||||
final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId());
|
||||
final File dstFile = new File(destDir, replicaInfo.getBlockName());
|
||||
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp());
|
||||
final File srcMeta = replicaInfo.getMetaFile();
|
||||
final File srcFile = replicaInfo.getBlockFile();
|
||||
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
|
||||
final File dstFile = new File(destDir, srcFile.getName());
|
||||
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
|
||||
try {
|
||||
FileUtils.copyFile(srcMeta, dstMeta);
|
||||
} catch (IOException e) {
|
||||
|
@ -1847,10 +1846,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
File memFile = memBlockInfo.getBlockFile();
|
||||
if (memFile.exists()) {
|
||||
if (memFile.compareTo(diskFile) != 0) {
|
||||
LOG.warn("Block file " + memFile.getAbsolutePath()
|
||||
+ " does not match file found by scan "
|
||||
+ diskFile.getAbsolutePath());
|
||||
// TODO: Should the diskFile be deleted?
|
||||
if (diskMetaFile.exists()) {
|
||||
if (memBlockInfo.getMetaFile().exists()) {
|
||||
// We have two sets of block+meta files. Decide which one to
|
||||
// keep.
|
||||
ReplicaInfo diskBlockInfo = new FinalizedReplica(
|
||||
blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile());
|
||||
((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas(
|
||||
memBlockInfo, diskBlockInfo, volumeMap);
|
||||
}
|
||||
} else {
|
||||
if (!diskFile.delete()) {
|
||||
LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Block refers to a block file that does not exist.
|
||||
|
@ -2315,6 +2324,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
FsVolumeImpl targetVolume;
|
||||
ReplicaInfo replicaInfo;
|
||||
BlockPoolSlice bpSlice;
|
||||
File srcFile, srcMeta;
|
||||
long genStamp;
|
||||
|
||||
synchronized (FsDatasetImpl.this) {
|
||||
replicaInfo = volumeMap.get(bpid, blockId);
|
||||
|
@ -2336,10 +2348,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
|
||||
File[] savedFiles = targetVolume.getBlockPoolSlice(bpid)
|
||||
.lazyPersistReplica(replicaInfo);
|
||||
lazyWriteReplicaTracker.recordEndLazyPersist(
|
||||
bpid, blockId, savedFiles[0], savedFiles[1]);
|
||||
bpSlice = targetVolume.getBlockPoolSlice(bpid);
|
||||
srcMeta = replicaInfo.getMetaFile();
|
||||
srcFile = replicaInfo.getBlockFile();
|
||||
genStamp = replicaInfo.getGenerationStamp();
|
||||
}
|
||||
|
||||
// Drop the FsDatasetImpl lock for the file copy.
|
||||
File[] savedFiles =
|
||||
bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
|
||||
|
||||
synchronized (FsDatasetImpl.this) {
|
||||
lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
|
||||
|
@ -2360,7 +2380,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
try {
|
||||
replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
|
||||
if (replicaState != null) {
|
||||
// Move the replica outside the lock.
|
||||
moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
|
||||
}
|
||||
succeeded = true;
|
||||
|
@ -2459,9 +2478,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
// Remove the old replicas from transient storage.
|
||||
if (blockFile.delete() || !blockFile.exists()) {
|
||||
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
|
||||
}
|
||||
if (metaFile.delete() || !metaFile.exists()) {
|
||||
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
|
||||
if (metaFile.delete() || !metaFile.exists()) {
|
||||
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
|
||||
}
|
||||
}
|
||||
|
||||
// If deletion failed then the directory scanner will cleanup the blocks
|
||||
|
|
|
@ -161,9 +161,13 @@ class LazyWriteReplicaTracker {
|
|||
replicaState.lazyPersistVolume = checkpointVolume;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bpid
|
||||
* @param blockId
|
||||
* @param savedFiles The saved meta and block files, in that order.
|
||||
*/
|
||||
synchronized void recordEndLazyPersist(
|
||||
final String bpid, final long blockId,
|
||||
final File savedMetaFile, final File savedBlockFile) {
|
||||
final String bpid, final long blockId, final File[] savedFiles) {
|
||||
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
|
||||
ReplicaState replicaState = map.get(blockId);
|
||||
|
||||
|
@ -172,8 +176,8 @@ class LazyWriteReplicaTracker {
|
|||
bpid + "; blockId=" + blockId);
|
||||
}
|
||||
replicaState.state = State.LAZY_PERSIST_COMPLETE;
|
||||
replicaState.savedMetaFile = savedMetaFile;
|
||||
replicaState.savedBlockFile = savedBlockFile;
|
||||
replicaState.savedMetaFile = savedFiles[0];
|
||||
replicaState.savedBlockFile = savedFiles[1];
|
||||
|
||||
if (replicasNotPersisted.peek() == replicaState) {
|
||||
// Common case.
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -31,22 +33,21 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -60,22 +61,29 @@ public class TestDirectoryScanner {
|
|||
|
||||
private MiniDFSCluster cluster;
|
||||
private String bpid;
|
||||
private DFSClient client;
|
||||
private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
|
||||
private DirectoryScanner scanner = null;
|
||||
private final Random rand = new Random();
|
||||
private final Random r = new Random();
|
||||
private static final int BLOCK_LENGTH = 100;
|
||||
|
||||
static {
|
||||
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
|
||||
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
|
||||
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
|
||||
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||
}
|
||||
|
||||
/** create a file with a length of <code>fileLen</code> */
|
||||
private void createFile(String fileName, long fileLen) throws IOException {
|
||||
private List<LocatedBlock> createFile(String fileNamePrefix,
|
||||
long fileLen,
|
||||
boolean isLazyPersist) throws IOException {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
Path filePath = new Path(fileName);
|
||||
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
|
||||
Path filePath = new Path("/" + fileNamePrefix + ".dat");
|
||||
DFSTestUtil.createFile(
|
||||
fs, filePath, isLazyPersist, 1024, fileLen,
|
||||
BLOCK_LENGTH, (short) 1, r.nextLong(), false);
|
||||
return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
|
||||
}
|
||||
|
||||
/** Truncate a block file */
|
||||
|
@ -134,6 +142,43 @@ public class TestDirectoryScanner {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Duplicate the given block on all volumes.
|
||||
* @param blockId
|
||||
* @throws IOException
|
||||
*/
|
||||
private void duplicateBlock(long blockId) throws IOException {
|
||||
synchronized (fds) {
|
||||
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
|
||||
for (FsVolumeSpi v : fds.getVolumes()) {
|
||||
if (v.getStorageID().equals(b.getVolume().getStorageID())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Volume without a copy of the block. Make a copy now.
|
||||
File sourceBlock = b.getBlockFile();
|
||||
File sourceMeta = b.getMetaFile();
|
||||
String sourceRoot = b.getVolume().getBasePath();
|
||||
String destRoot = v.getBasePath();
|
||||
|
||||
String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath();
|
||||
String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath();
|
||||
|
||||
File destBlock = new File(destRoot, relativeBlockPath);
|
||||
File destMeta = new File(destRoot, relativeMetaPath);
|
||||
|
||||
destBlock.getParentFile().mkdirs();
|
||||
FileUtils.copyFile(sourceBlock, destBlock);
|
||||
FileUtils.copyFile(sourceMeta, destMeta);
|
||||
|
||||
if (destBlock.exists() && destMeta.exists()) {
|
||||
LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
|
||||
LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Get a random blockId that is not used already */
|
||||
private long getFreeBlockId() {
|
||||
long id = rand.nextLong();
|
||||
|
@ -216,6 +261,12 @@ public class TestDirectoryScanner {
|
|||
|
||||
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
|
||||
long missingMemoryBlocks, long mismatchBlocks) throws IOException {
|
||||
scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
|
||||
missingMemoryBlocks, mismatchBlocks, 0);
|
||||
}
|
||||
|
||||
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
|
||||
long missingMemoryBlocks, long mismatchBlocks, long duplicateBlocks) throws IOException {
|
||||
scanner.reconcile();
|
||||
|
||||
assertTrue(scanner.diffs.containsKey(bpid));
|
||||
|
@ -229,9 +280,92 @@ public class TestDirectoryScanner {
|
|||
assertEquals(missingBlockFile, stats.missingBlockFile);
|
||||
assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
|
||||
assertEquals(mismatchBlocks, stats.mismatchBlocks);
|
||||
assertEquals(duplicateBlocks, stats.duplicateBlocks);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=300000)
|
||||
public void testRetainBlockOnPersistentStorage() throws Exception {
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(CONF)
|
||||
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
|
||||
.numDataNodes(1)
|
||||
.build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||
client = cluster.getFileSystem().getClient();
|
||||
scanner = new DirectoryScanner(fds, CONF);
|
||||
scanner.setRetainDiffs(true);
|
||||
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
|
||||
// Add a file with 1 block
|
||||
List<LocatedBlock> blocks =
|
||||
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
|
||||
|
||||
// Ensure no difference between volumeMap and disk.
|
||||
scan(1, 0, 0, 0, 0, 0);
|
||||
|
||||
// Make a copy of the block on RAM_DISK and ensure that it is
|
||||
// picked up by the scanner.
|
||||
duplicateBlock(blocks.get(0).getBlock().getBlockId());
|
||||
scan(2, 1, 0, 0, 0, 0, 1);
|
||||
verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
|
||||
scan(1, 0, 0, 0, 0, 0);
|
||||
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
scanner.shutdown();
|
||||
scanner = null;
|
||||
}
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testDeleteBlockOnTransientStorage() throws Exception {
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(CONF)
|
||||
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
|
||||
.numDataNodes(1)
|
||||
.build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||
client = cluster.getFileSystem().getClient();
|
||||
scanner = new DirectoryScanner(fds, CONF);
|
||||
scanner.setRetainDiffs(true);
|
||||
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
|
||||
// Create a file file on RAM_DISK
|
||||
List<LocatedBlock> blocks =
|
||||
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
|
||||
|
||||
// Ensure no difference between volumeMap and disk.
|
||||
scan(1, 0, 0, 0, 0, 0);
|
||||
|
||||
// Make a copy of the block on DEFAULT storage and ensure that it is
|
||||
// picked up by the scanner.
|
||||
duplicateBlock(blocks.get(0).getBlock().getBlockId());
|
||||
scan(2, 1, 0, 0, 0, 0, 1);
|
||||
|
||||
// Ensure that the copy on RAM_DISK was deleted.
|
||||
verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
|
||||
scan(1, 0, 0, 0, 0, 0);
|
||||
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
scanner.shutdown();
|
||||
scanner = null;
|
||||
}
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=600000)
|
||||
public void testDirectoryScanner() throws Exception {
|
||||
// Run the test with and without parallel scanning
|
||||
for (int parallelism = 1; parallelism < 3; parallelism++) {
|
||||
|
@ -245,16 +379,17 @@ public class TestDirectoryScanner {
|
|||
cluster.waitActive();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||
client = cluster.getFileSystem().getClient();
|
||||
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
||||
parallelism);
|
||||
scanner = new DirectoryScanner(fds, CONF);
|
||||
scanner.setRetainDiffs(true);
|
||||
|
||||
// Add files with 100 blocks
|
||||
createFile("/tmp/t1", 10000);
|
||||
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 100, false);
|
||||
long totalBlocks = 100;
|
||||
|
||||
// Test1: No difference between in-memory and disk
|
||||
// Test1: No difference between volumeMap and disk
|
||||
scan(100, 0, 0, 0, 0, 0);
|
||||
|
||||
// Test2: block metafile is missing
|
||||
|
@ -355,7 +490,10 @@ public class TestDirectoryScanner {
|
|||
assertFalse(scanner.getRunStatus());
|
||||
|
||||
} finally {
|
||||
scanner.shutdown();
|
||||
if (scanner != null) {
|
||||
scanner.shutdown();
|
||||
scanner = null;
|
||||
}
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
@ -389,6 +527,13 @@ public class TestDirectoryScanner {
|
|||
assertEquals(genStamp, memBlock.getGenerationStamp());
|
||||
}
|
||||
|
||||
private void verifyStorageType(long blockId, boolean expectTransient) {
|
||||
final ReplicaInfo memBlock;
|
||||
memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
|
||||
assertNotNull(memBlock);
|
||||
assertThat(memBlock.getVolume().isTransientStorage(), is(expectTransient));
|
||||
}
|
||||
|
||||
private static class TestFsVolumeSpi implements FsVolumeSpi {
|
||||
@Override
|
||||
public String[] getBlockPoolList() {
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Collection;
|
|||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
|
||||
|
@ -62,4 +63,13 @@ public class FsDatasetTestUtil {
|
|||
String bpid) {
|
||||
return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the lazy writer daemon that saves RAM disk files to persistent storage.
|
||||
* @param dn
|
||||
*/
|
||||
public static void stopLazyWriter(DataNode dn) {
|
||||
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
|
||||
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -368,6 +368,8 @@ public class TestLazyPersistFiles {
|
|||
// Found a persisted copy for this block!
|
||||
boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
|
||||
assertThat(added, is(true));
|
||||
} else {
|
||||
LOG.error(blockFile + " not found");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -423,7 +425,7 @@ public class TestLazyPersistFiles {
|
|||
final int SEED = 0XFADED;
|
||||
|
||||
// Stop lazy writer to ensure block for path1 is not persisted to disk.
|
||||
stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
|
||||
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
|
||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||
|
@ -488,7 +490,7 @@ public class TestLazyPersistFiles {
|
|||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
|
||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||
makeTestFile(path, BLOCK_SIZE, true);
|
||||
|
@ -682,7 +684,7 @@ public class TestLazyPersistFiles {
|
|||
throws IOException, InterruptedException {
|
||||
|
||||
startUpCluster(true, 1);
|
||||
stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
||||
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||
|
@ -794,12 +796,6 @@ public class TestLazyPersistFiles {
|
|||
return locatedBlocks;
|
||||
}
|
||||
|
||||
private void stopLazyWriter(DataNode dn) {
|
||||
// Stop the lazyWriter daemon.
|
||||
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
|
||||
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
|
||||
}
|
||||
|
||||
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
|
||||
long seed) throws IOException {
|
||||
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
||||
|
|
Loading…
Reference in New Issue