diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt index 3670daba113..346f91295d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt @@ -41,3 +41,6 @@ HDFS-6991. Notify NN of evicted block before deleting it from RAM disk. (Arpit Agarwal) + HDFS-6978. Directory scanner should correctly reconcile blocks on RAM + disk. (Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index c313b044b20..71f976b05fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -83,6 +83,7 @@ static class Stats { long missingBlockFile = 0; long missingMemoryBlocks = 0; long mismatchBlocks = 0; + long duplicateBlocks = 0; public Stats(String bpid) { this.bpid = bpid; @@ -440,7 +441,7 @@ void scan() { int d = 0; // index for blockpoolReport int m = 0; // index for memReprot while (m < memReport.length && d < blockpoolReport.length) { - Block memBlock = memReport[Math.min(m, memReport.length - 1)]; + FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)]; ScanInfo info = blockpoolReport[Math.min( d, blockpoolReport.length - 1)]; if (info.getBlockId() < memBlock.getBlockId()) { @@ -468,9 +469,23 @@ void scan() { // or block file length is different than expected statsRecord.mismatchBlocks++; addDifference(diffRecord, statsRecord, info); + } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) { + // volumeMap record and on-disk files don't match. + statsRecord.duplicateBlocks++; + addDifference(diffRecord, statsRecord, info); } d++; - m++; + + if (d < blockpoolReport.length) { + // There may be multiple on-disk records for the same block, don't increment + // the memory record pointer if so. + ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)]; + if (nextInfo.getBlockId() != info.blockId) { + ++m; + } + } else { + ++m; + } } while (m < memReport.length) { FinalizedReplica current = memReport[m++]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 034376bd686..f39ca167c64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -263,15 +263,16 @@ File addFinalizedBlock(Block b, File f) throws IOException { /** * Save the given replica to persistent storage. * - * @param replicaInfo * @return The saved meta and block files, in that order. * @throws IOException */ - File[] lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { + File[] lazyPersistReplica(long blockId, long genStamp, + File srcMeta, File srcFile) throws IOException { if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); } - File targetFiles[] = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir); + File targetFiles[] = FsDatasetImpl.copyBlockFiles( + blockId, genStamp, srcMeta, srcFile, lazypersistDir); dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length()); return targetFiles; } @@ -504,7 +505,7 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, * @return the replica that is retained. * @throws IOException */ - private ReplicaInfo resolveDuplicateReplicas( + ReplicaInfo resolveDuplicateReplicas( final ReplicaInfo replica1, final ReplicaInfo replica2, final ReplicaMap volumeMap) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 9b44d458ff1..6366a4fb09a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -616,17 +616,16 @@ static File moveBlockFiles(Block b, File srcfile, File destdir) } /** - * Copy the block and meta files for the given block from the given + * Copy the block and meta files for the given block to the given destination. * @return the new meta and block files. * @throws IOException */ - static File[] copyBlockFiles(ReplicaInfo replicaInfo, File destRoot) + static File[] copyBlockFiles(long blockId, long genStamp, + File srcMeta, File srcFile, File destRoot) throws IOException { - final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId()); - final File dstFile = new File(destDir, replicaInfo.getBlockName()); - final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp()); - final File srcMeta = replicaInfo.getMetaFile(); - final File srcFile = replicaInfo.getBlockFile(); + final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); + final File dstFile = new File(destDir, srcFile.getName()); + final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); try { FileUtils.copyFile(srcMeta, dstMeta); } catch (IOException e) { @@ -1749,10 +1748,20 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, File memFile = memBlockInfo.getBlockFile(); if (memFile.exists()) { if (memFile.compareTo(diskFile) != 0) { - LOG.warn("Block file " + memFile.getAbsolutePath() - + " does not match file found by scan " - + diskFile.getAbsolutePath()); - // TODO: Should the diskFile be deleted? + if (diskMetaFile.exists()) { + if (memBlockInfo.getMetaFile().exists()) { + // We have two sets of block+meta files. Decide which one to + // keep. + ReplicaInfo diskBlockInfo = new FinalizedReplica( + blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile()); + ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas( + memBlockInfo, diskBlockInfo, volumeMap); + } + } else { + if (!diskFile.delete()) { + LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan"); + } + } } } else { // Block refers to a block file that does not exist. @@ -2217,6 +2226,9 @@ private void moveReplicaToNewVolume(String bpid, long blockId) FsVolumeImpl targetVolume; ReplicaInfo replicaInfo; + BlockPoolSlice bpSlice; + File srcFile, srcMeta; + long genStamp; synchronized (FsDatasetImpl.this) { replicaInfo = volumeMap.get(bpid, blockId); @@ -2238,10 +2250,18 @@ private void moveReplicaToNewVolume(String bpid, long blockId) } lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); - File[] savedFiles = targetVolume.getBlockPoolSlice(bpid) - .lazyPersistReplica(replicaInfo); - lazyWriteReplicaTracker.recordEndLazyPersist( - bpid, blockId, savedFiles[0], savedFiles[1]); + bpSlice = targetVolume.getBlockPoolSlice(bpid); + srcMeta = replicaInfo.getMetaFile(); + srcFile = replicaInfo.getBlockFile(); + genStamp = replicaInfo.getGenerationStamp(); + } + + // Drop the FsDatasetImpl lock for the file copy. + File[] savedFiles = + bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile); + + synchronized (FsDatasetImpl.this) { + lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles); if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + @@ -2262,7 +2282,6 @@ private boolean saveNextReplica() { try { replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); if (replicaState != null) { - // Move the replica outside the lock. moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); } succeeded = true; @@ -2357,9 +2376,9 @@ private void evictBlocks() throws IOException { // Remove the old replicas from transient storage. if (blockFile.delete() || !blockFile.exists()) { ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed); - } - if (metaFile.delete() || !metaFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed); + if (metaFile.delete() || !metaFile.exists()) { + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed); + } } // If deletion failed then the directory scanner will cleanup the blocks diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java index a8ab1b9b804..e8d9c5c63ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java @@ -161,9 +161,13 @@ synchronized void recordStartLazyPersist( replicaState.lazyPersistVolume = checkpointVolume; } + /** + * @param bpid + * @param blockId + * @param savedFiles The saved meta and block files, in that order. + */ synchronized void recordEndLazyPersist( - final String bpid, final long blockId, - final File savedMetaFile, final File savedBlockFile) { + final String bpid, final long blockId, final File[] savedFiles) { Map map = replicaMaps.get(bpid); ReplicaState replicaState = map.get(blockId); @@ -172,8 +176,8 @@ synchronized void recordEndLazyPersist( bpid + "; blockId=" + blockId); } replicaState.state = State.LAZY_PERSIST_COMPLETE; - replicaState.savedMetaFile = savedMetaFile; - replicaState.savedBlockFile = savedBlockFile; + replicaState.savedMetaFile = savedFiles[0]; + replicaState.savedBlockFile = savedFiles[1]; if (replicasNotPersisted.peek() == replicaState) { // Common case. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 89649aaee96..b7795b5f4c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -31,22 +33,21 @@ import java.util.List; import java.util.Random; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; /** @@ -60,22 +61,29 @@ public class TestDirectoryScanner { private MiniDFSCluster cluster; private String bpid; + private DFSClient client; private FsDatasetSpi fds = null; private DirectoryScanner scanner = null; private final Random rand = new Random(); private final Random r = new Random(); + private static final int BLOCK_LENGTH = 100; static { - CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100); + CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH); CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); } /** create a file with a length of fileLen */ - private void createFile(String fileName, long fileLen) throws IOException { + private List createFile(String fileNamePrefix, + long fileLen, + boolean isLazyPersist) throws IOException { FileSystem fs = cluster.getFileSystem(); - Path filePath = new Path(fileName); - DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong()); + Path filePath = new Path("/" + fileNamePrefix + ".dat"); + DFSTestUtil.createFile( + fs, filePath, isLazyPersist, 1024, fileLen, + BLOCK_LENGTH, (short) 1, r.nextLong(), false); + return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks(); } /** Truncate a block file */ @@ -134,6 +142,43 @@ private long deleteMetaFile() { return 0; } + /** + * Duplicate the given block on all volumes. + * @param blockId + * @throws IOException + */ + private void duplicateBlock(long blockId) throws IOException { + synchronized (fds) { + ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); + for (FsVolumeSpi v : fds.getVolumes()) { + if (v.getStorageID().equals(b.getVolume().getStorageID())) { + continue; + } + + // Volume without a copy of the block. Make a copy now. + File sourceBlock = b.getBlockFile(); + File sourceMeta = b.getMetaFile(); + String sourceRoot = b.getVolume().getBasePath(); + String destRoot = v.getBasePath(); + + String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath(); + String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath(); + + File destBlock = new File(destRoot, relativeBlockPath); + File destMeta = new File(destRoot, relativeMetaPath); + + destBlock.getParentFile().mkdirs(); + FileUtils.copyFile(sourceBlock, destBlock); + FileUtils.copyFile(sourceMeta, destMeta); + + if (destBlock.exists() && destMeta.exists()) { + LOG.info("Copied " + sourceBlock + " ==> " + destBlock); + LOG.info("Copied " + sourceMeta + " ==> " + destMeta); + } + } + } + } + /** Get a random blockId that is not used already */ private long getFreeBlockId() { long id = rand.nextLong(); @@ -216,6 +261,12 @@ private long createBlockMetaFile() throws IOException { private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks) throws IOException { + scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile, + missingMemoryBlocks, mismatchBlocks, 0); + } + + private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile, + long missingMemoryBlocks, long mismatchBlocks, long duplicateBlocks) throws IOException { scanner.reconcile(); assertTrue(scanner.diffs.containsKey(bpid)); @@ -229,9 +280,92 @@ private void scan(long totalBlocks, int diffsize, long missingMetaFile, long mis assertEquals(missingBlockFile, stats.missingBlockFile); assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks); assertEquals(mismatchBlocks, stats.mismatchBlocks); + assertEquals(duplicateBlocks, stats.duplicateBlocks); } - @Test + @Test (timeout=300000) + public void testRetainBlockOnPersistentStorage() throws Exception { + cluster = new MiniDFSCluster + .Builder(CONF) + .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) + .numDataNodes(1) + .build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + scanner = new DirectoryScanner(fds, CONF); + scanner.setRetainDiffs(true); + FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); + + // Add a file with 1 block + List blocks = + createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false); + + // Ensure no difference between volumeMap and disk. + scan(1, 0, 0, 0, 0, 0); + + // Make a copy of the block on RAM_DISK and ensure that it is + // picked up by the scanner. + duplicateBlock(blocks.get(0).getBlock().getBlockId()); + scan(2, 1, 0, 0, 0, 0, 1); + verifyStorageType(blocks.get(0).getBlock().getBlockId(), false); + scan(1, 0, 0, 0, 0, 0); + + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + cluster.shutdown(); + cluster = null; + } + } + + @Test (timeout=300000) + public void testDeleteBlockOnTransientStorage() throws Exception { + cluster = new MiniDFSCluster + .Builder(CONF) + .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) + .numDataNodes(1) + .build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + scanner = new DirectoryScanner(fds, CONF); + scanner.setRetainDiffs(true); + FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); + + // Create a file file on RAM_DISK + List blocks = + createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true); + + // Ensure no difference between volumeMap and disk. + scan(1, 0, 0, 0, 0, 0); + + // Make a copy of the block on DEFAULT storage and ensure that it is + // picked up by the scanner. + duplicateBlock(blocks.get(0).getBlock().getBlockId()); + scan(2, 1, 0, 0, 0, 0, 1); + + // Ensure that the copy on RAM_DISK was deleted. + verifyStorageType(blocks.get(0).getBlock().getBlockId(), false); + scan(1, 0, 0, 0, 0, 0); + + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + cluster.shutdown(); + cluster = null; + } + } + + @Test (timeout=600000) public void testDirectoryScanner() throws Exception { // Run the test with and without parallel scanning for (int parallelism = 1; parallelism < 3; parallelism++) { @@ -245,16 +379,17 @@ public void runTest(int parallelism) throws Exception { cluster.waitActive(); bpid = cluster.getNamesystem().getBlockPoolId(); fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, parallelism); scanner = new DirectoryScanner(fds, CONF); scanner.setRetainDiffs(true); // Add files with 100 blocks - createFile("/tmp/t1", 10000); + createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 100, false); long totalBlocks = 100; - // Test1: No difference between in-memory and disk + // Test1: No difference between volumeMap and disk scan(100, 0, 0, 0, 0, 0); // Test2: block metafile is missing @@ -355,7 +490,10 @@ public void runTest(int parallelism) throws Exception { assertFalse(scanner.getRunStatus()); } finally { - scanner.shutdown(); + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } cluster.shutdown(); } } @@ -389,6 +527,13 @@ private void verifyGenStamp(long blockId, long genStamp) { assertEquals(genStamp, memBlock.getGenerationStamp()); } + private void verifyStorageType(long blockId, boolean expectTransient) { + final ReplicaInfo memBlock; + memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); + assertNotNull(memBlock); + assertThat(memBlock.getVolume().isTransientStorage(), is(expectTransient)); + } + private static class TestFsVolumeSpi implements FsVolumeSpi { @Override public String[] getBlockPoolList() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java index d6d7dd7399a..48ddcc29b5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -62,4 +63,13 @@ public static Collection getReplicas(FsDatasetSpi fsd, String bpid) { return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid); } + + /** + * Stop the lazy writer daemon that saves RAM disk files to persistent storage. + * @param dn + */ + public static void stopLazyWriter(DataNode dn) { + FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset()); + ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index fcc47985675..777779f208f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -368,6 +368,8 @@ public void testLazyPersistBlocksAreSaved() // Found a persisted copy for this block! boolean added = persistedBlockIds.add(lb.getBlock().getBlockId()); assertThat(added, is(true)); + } else { + LOG.error(blockFile + " not found"); } } } @@ -423,7 +425,7 @@ public void testRamDiskEvictionBeforePersist() final int SEED = 0XFADED; // Stop lazy writer to ensure block for path1 is not persisted to disk. - stopLazyWriter(cluster.getDataNodes().get(0)); + FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); ensureFileReplicasOnStorageType(path1, RAM_DISK); @@ -488,7 +490,7 @@ public void testDeleteBeforePersist() throws IOException, InterruptedException { startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); - stopLazyWriter(cluster.getDataNodes().get(0)); + FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); @@ -682,7 +684,7 @@ public void testDnRestartWithUnsavedReplicas() throws IOException, InterruptedException { startUpCluster(true, 1); - stopLazyWriter(cluster.getDataNodes().get(0)); + FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0)); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); @@ -794,12 +796,6 @@ private LocatedBlocks ensureFileReplicasOnStorageType( return locatedBlocks; } - private void stopLazyWriter(DataNode dn) { - // Stop the lazyWriter daemon. - FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset()); - ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop(); - } - private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist, long seed) throws IOException { DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,