From b8a2eb793a4cac7a7961204df9d4ca4600cdbcf0 Mon Sep 17 00:00:00 2001 From: arp Date: Mon, 8 Sep 2014 14:29:30 -0700 Subject: [PATCH] HDFS-6991. Notify NN of evicted block before deleting it from RAM disk. (Arpit Agarwal) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../server/blockmanagement/BlockManager.java | 4 +- .../hdfs/server/datanode/BPOfferService.java | 6 - .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../fsdataset/impl/FsDatasetImpl.java | 95 ++++++----- .../apache/hadoop/hdfs/MiniDFSCluster.java | 3 +- .../fsdataset/impl/TestLazyPersistFiles.java | 157 +++++++----------- 7 files changed, 127 insertions(+), 144 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e12b01a6e09..f6c01806ea7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -129,6 +129,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; + public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent"; + public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10; + public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas"; + public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT = 3; public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT = "dfs.namenode.path.based.cache.block.map.allocation.percent"; public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c4dfa68ecc8..878002d128c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2119,8 +2119,8 @@ public class BlockManager { // Add replica if appropriate. If the replica was previously corrupt // but now okay, it might need to be updated. if (reportedState == ReplicaState.FINALIZED - && (!storedBlock.findDatanode(dn) - || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { + && (storedBlock.findStorageInfo(storageInfo) == -1 || + corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { toAdd.add(storedBlock); } return storedBlock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 2eab87fd692..1455a8a7312 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -230,7 +230,6 @@ class BPOfferService { void notifyNamenodeReceivedBlock( ExtendedBlock block, String delHint, String storageUuid) { checkBlock(block); - checkDelHint(delHint); ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, @@ -249,11 +248,6 @@ class BPOfferService { block.getBlockPoolId(), getBlockPoolId()); } - private void checkDelHint(String delHint) { - Preconditions.checkArgument(delHint != null, - "delHint is null"); - } - void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { checkBlock(block); ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a4db56a21b3..bf098995cdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -897,7 +897,7 @@ public class DataNode extends ReconfigurableBase } // calls specific to BP - protected void notifyNamenodeReceivedBlock( + public void notifyNamenodeReceivedBlock( ExtendedBlock block, String delHint, String storageUuid) { BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 19260db3e2d..2b16a65b1c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -47,7 +47,6 @@ import javax.management.StandardMBean; import com.google.common.collect.Lists; import com.google.common.base.Preconditions; -import com.google.common.collect.TreeMultimap; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -282,9 +281,7 @@ class FsDatasetImpl implements FsDatasetSpi { cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. - lazyWriter = new Daemon(new LazyWriter( - conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, - DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC))); + lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); } @@ -2294,16 +2291,23 @@ class FsDatasetImpl implements FsDatasetSpi { private volatile boolean shouldRun = true; final int checkpointerInterval; final long estimateBlockSize; - - public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10; - public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3; + final int lowWatermarkFreeSpacePercentage; + final int lowWatermarkFreeSpaceReplicas; - public LazyWriter(final int checkpointerInterval) { - this.checkpointerInterval = checkpointerInterval; + public LazyWriter(Configuration conf) { + this.checkpointerInterval = conf.getInt( + DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, + DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC); this.estimateBlockSize = conf.getLongBytes( DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + this.lowWatermarkFreeSpacePercentage = conf.getInt( + DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT, + DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT); + this.lowWatermarkFreeSpaceReplicas = conf.getInt( + DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS, + DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT); } private void moveReplicaToNewVolume(String bpid, long blockId) @@ -2390,49 +2394,63 @@ class FsDatasetImpl implements FsDatasetSpi { } int percentFree = (int) (free * 100 / capacity); - return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT || - free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS); + return percentFree < lowWatermarkFreeSpacePercentage || + free < (estimateBlockSize * lowWatermarkFreeSpaceReplicas); } /** * Attempt to evict one or more transient block replicas we have at least * spaceNeeded bytes free. */ - private synchronized void evictBlocks() throws IOException { + private void evictBlocks() throws IOException { int iterations = 0; - LazyWriteReplicaTracker.ReplicaState replicaState = - lazyWriteReplicaTracker.getNextCandidateForEviction(); - - while (replicaState != null && - iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION & + while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && transientFreeSpaceBelowThreshold()) { + LazyWriteReplicaTracker.ReplicaState replicaState = + lazyWriteReplicaTracker.getNextCandidateForEviction(); + if (LOG.isDebugEnabled()) { - LOG.info("Evicting block " + replicaState); + LOG.debug("Evicting block " + replicaState); } - ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId); - Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); - File blockFile = replicaInfo.getBlockFile(); - File metaFile = replicaInfo.getMetaFile(); - long blockFileUsed = blockFile.length(); - long metaFileUsed = metaFile.length(); - lazyWriteReplicaTracker.discardReplica(replicaState, false); - // Move the replica from lazyPersist/ to finalized/ on target volume - BlockPoolSlice bpSlice = - replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid); - File newBlockFile = bpSlice.activateSavedReplica( - replicaInfo, replicaState.savedBlockFile); + ReplicaInfo replicaInfo, newReplicaInfo; + File blockFile, metaFile; + long blockFileUsed, metaFileUsed; - ReplicaInfo newReplicaInfo = - new FinalizedReplica(replicaInfo.getBlockId(), - replicaInfo.getBytesOnDisk(), - replicaInfo.getGenerationStamp(), - replicaState.lazyPersistVolume, - newBlockFile.getParentFile()); + synchronized (FsDatasetImpl.this) { + replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId); + Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); + blockFile = replicaInfo.getBlockFile(); + metaFile = replicaInfo.getMetaFile(); + blockFileUsed = blockFile.length(); + metaFileUsed = metaFile.length(); + lazyWriteReplicaTracker.discardReplica(replicaState, false); - // Update the volumeMap entry. This removes the old entry. - volumeMap.add(replicaState.bpid, newReplicaInfo); + // Move the replica from lazyPersist/ to finalized/ on target volume + BlockPoolSlice bpSlice = + replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid); + File newBlockFile = bpSlice.activateSavedReplica( + replicaInfo, replicaState.savedBlockFile); + + newReplicaInfo = + new FinalizedReplica(replicaInfo.getBlockId(), + replicaInfo.getBytesOnDisk(), + replicaInfo.getGenerationStamp(), + replicaState.lazyPersistVolume, + newBlockFile.getParentFile()); + + // Update the volumeMap entry. + volumeMap.add(replicaState.bpid, newReplicaInfo); + } + + // Before deleting the files from transient storage we must notify the + // NN that the files are on the new storage. Else a blockReport from + // the transient storage might cause the NN to think the blocks are lost. + ExtendedBlock extendedBlock = + new ExtendedBlock(replicaState.bpid, newReplicaInfo); + datanode.notifyNamenodeReceivedBlock( + extendedBlock, null, newReplicaInfo.getStorageUuid()); // Remove the old replicas from transient storage. if (blockFile.delete() || !blockFile.exists()) { @@ -2444,7 +2462,6 @@ class FsDatasetImpl implements FsDatasetSpi { // If deletion failed then the directory scanner will cleanup the blocks // eventually. - replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 338744d0a06..703471537b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1343,6 +1343,7 @@ public class MiniDFSCluster { } int curDatanodesNum = dataNodes.size(); + final int curDatanodesNumSaved = curDatanodesNum; // for mincluster's the default initialDelay for BRs is 0 if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) { conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0); @@ -1481,7 +1482,7 @@ public class MiniDFSCluster { waitActive(); if (storageCapacities != null) { - for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) { + for (int i = curDatanodesNumSaved; i < curDatanodesNumSaved+numDataNodes; ++i) { final int index = i - curDatanodesNum; List volumes = dns[index].getFSDataset().getVolumes(); assert storageCapacities[index].length == storagesPerDatanode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 7dfba50a968..fcc47985675 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -71,13 +70,14 @@ public class TestLazyPersistFiles { private static final int THREADPOOL_SIZE = 10; - private static short REPL_FACTOR = 1; + private static final short REPL_FACTOR = 1; private static final int BLOCK_SIZE = 10485760; // 10 MB private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final int LAZY_WRITER_INTERVAL_SEC = 1; private static final int BUFFER_LENGTH = 4096; + private static final int EVICTION_LOW_WATERMARK = 1; private MiniDFSCluster cluster; private DistributedFileSystem fs; @@ -101,7 +101,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testFlagNotSetByDefault() throws IOException { - startUpCluster(REPL_FACTOR, null, -1); + startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -113,7 +113,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testFlagPropagation() throws IOException { - startUpCluster(REPL_FACTOR, null, -1); + startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -125,7 +125,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testFlagPersistenceInEditLog() throws IOException { - startUpCluster(REPL_FACTOR, null, -1); + startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -139,10 +139,9 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testFlagPersistenceInFsImage() throws IOException { - startUpCluster(REPL_FACTOR, null, -1); + startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); - FSDataOutputStream fos = null; makeTestFile(path, 0, true); // checkpoint @@ -158,7 +157,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testPlacementOnRamDisk() throws IOException { - startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK}, -1); + startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -168,8 +167,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testPlacementOnSizeLimitedRamDisk() throws IOException { - startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK }, - 3 * BLOCK_SIZE -1); // 2 replicas + delta + startUpCluster(true, 3); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); @@ -188,7 +186,7 @@ public class TestLazyPersistFiles { */ @Test (timeout=300000) public void testFallbackToDisk() throws IOException { - startUpCluster(REPL_FACTOR, null, -1); + startUpCluster(false, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -202,7 +200,7 @@ public class TestLazyPersistFiles { */ @Test (timeout=300000) public void testFallbackToDiskFull() throws IOException { - startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1); + startUpCluster(false, 0); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -213,15 +211,13 @@ public class TestLazyPersistFiles { /** * File partially fit in RamDisk after eviction. * RamDisk can fit 2 blocks. Write a file with 5 blocks. - * Expect 2 blocks are on RamDisk whereas other 3 on disk. + * Expect 2 or less blocks are on RamDisk and 3 or more on disk. * @throws IOException */ @Test (timeout=300000) public void testFallbackToDiskPartial() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, - new StorageType[] { RAM_DISK, DEFAULT }, - BLOCK_SIZE * 3 - 1); + startUpCluster(true, 2); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -241,12 +237,15 @@ public class TestLazyPersistFiles { for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { if (locatedBlock.getStorageTypes()[0] == RAM_DISK) { numBlocksOnRamDisk++; - }else if (locatedBlock.getStorageTypes()[0] == DEFAULT) { + } else if (locatedBlock.getStorageTypes()[0] == DEFAULT) { numBlocksOnDisk++; } } - assertThat(numBlocksOnRamDisk, is(2)); - assertThat(numBlocksOnDisk, is(3)); + + // Since eviction is asynchronous, depending on the timing of eviction + // wrt writes, we may get 2 or less blocks on RAM disk. + assert(numBlocksOnRamDisk <= 2); + assert(numBlocksOnDisk >= 3); } /** @@ -257,7 +256,7 @@ public class TestLazyPersistFiles { */ @Test (timeout=300000) public void testRamDiskNotChosenByDefault() throws IOException { - startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, RAM_DISK}, -1); + startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -275,7 +274,7 @@ public class TestLazyPersistFiles { */ @Test (timeout=300000) public void testAppendIsDenied() throws IOException { - startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1); + startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -297,17 +296,12 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testLazyPersistFilesAreDiscarded() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, - new StorageType[] { RAM_DISK, DEFAULT }, - (2 * BLOCK_SIZE - 1)); // 1 replica + delta. + startUpCluster(true, 2); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); - Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); makeTestFile(path1, BLOCK_SIZE, true); - makeTestFile(path2, BLOCK_SIZE, false); ensureFileReplicasOnStorageType(path1, RAM_DISK); - ensureFileReplicasOnStorageType(path2, DEFAULT); // Stop the DataNode and sleep for the time it takes the NN to // detect the DN as being dead. @@ -315,30 +309,28 @@ public class TestLazyPersistFiles { Thread.sleep(30000L); assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1)); - // Next, wait for the replication monitor to mark the file as - // corrupt, plus some delta. + // Next, wait for the replication monitor to mark the file as corrupt Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000); - // Wait for the LazyPersistFileScrubber to run, plus some delta. + // Wait for the LazyPersistFileScrubber to run Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); // Ensure that path1 does not exist anymore, whereas path2 does. assert(!fs.exists(path1)); - assert(fs.exists(path2)); - // We should have only one block that needs replication i.e. the one + // We should have zero blocks that needs replication i.e. the one // belonging to path2. assertThat(cluster.getNameNode() .getNamesystem() .getBlockManager() .getUnderReplicatedBlocksCount(), - is(1L)); + is(0L)); } @Test (timeout=300000) public void testLazyPersistBlocksAreSaved() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1); + startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -386,16 +378,12 @@ public class TestLazyPersistFiles { /** * RamDisk eviction after lazy persist to disk. - * Evicted blocks are still readable with on-disk replicas. * @throws IOException * @throws InterruptedException */ - @Test (timeout=300000) - public void testRamDiskEviction() - throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, - new StorageType[] { RAM_DISK, DEFAULT }, - (2 * BLOCK_SIZE - 1)); // 1 replica + delta. + @Test (timeout=300000) + public void testRamDiskEviction() throws IOException, InterruptedException { + startUpCluster(true, 1 + EVICTION_LOW_WATERMARK); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); @@ -405,16 +393,16 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(path1, RAM_DISK); // Sleep for a short time to allow the lazy writer thread to do its job. - // However the block replica should not be evicted from RAM_DISK yet. Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); ensureFileReplicasOnStorageType(path1, RAM_DISK); // Create another file with a replica on RAM_DISK. makeTestFile(path2, BLOCK_SIZE, true); + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); triggerBlockReport(); - // Make sure that the second file's block replica is on RAM_DISK, whereas - // the original file's block replica is now on disk. + // Ensure the first file was evicted to disk, the second is still on + // RAM_DISK. ensureFileReplicasOnStorageType(path2, RAM_DISK); ensureFileReplicasOnStorageType(path1, DEFAULT); } @@ -428,9 +416,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testRamDiskEvictionBeforePersist() throws IOException, InterruptedException { - // 1 replica + delta, lazy persist interval every 50 minutes - startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, - (2 * BLOCK_SIZE - 1)); + startUpCluster(true, 1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); @@ -463,8 +449,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testRamDiskEvictionLRU() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, - (4 * BLOCK_SIZE -1)); // 3 replica + delta. + startUpCluster(true, 3); final String METHOD_NAME = GenericTestUtils.getMethodName(); final int NUM_PATHS = 6; Path paths[] = new Path[NUM_PATHS]; @@ -501,8 +486,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testDeleteBeforePersist() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, - -1); + startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); stopLazyWriter(cluster.getDataNodes().get(0)); @@ -527,7 +511,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testDeleteAfterPersist() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1); + startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -554,8 +538,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testDfsUsageCreateDelete() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, - 5 * BLOCK_SIZE - 1); // 4 replica + delta + startUpCluster(true, 4); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -586,8 +569,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testConcurrentRead() throws Exception { - startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK }, - 3 * BLOCK_SIZE -1); // 2 replicas + delta + startUpCluster(true, 2); final String METHOD_NAME = GenericTestUtils.getMethodName(); final Path path1 = new Path("/" + METHOD_NAME + ".dat"); @@ -638,8 +620,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testConcurrentWrites() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, - (10 * BLOCK_SIZE -1)); // 9 replica + delta. + startUpCluster(true, 9); final String METHOD_NAME = GenericTestUtils.getMethodName(); final int SEED = 0xFADED; final int NUM_WRITERS = 4; @@ -659,8 +640,7 @@ public class TestLazyPersistFiles { ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE); for (int i = 0; i < NUM_WRITERS; i++) { - Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch, - testFailed); + Runnable writer = new WriterRunnable(i, paths[i], SEED, latch, testFailed); executor.execute(writer); } @@ -677,9 +657,7 @@ public class TestLazyPersistFiles { public void testDnRestartWithSavedReplicas() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, - new StorageType[] {RAM_DISK, DEFAULT }, - (2 * BLOCK_SIZE - 1)); // 1 replica + delta. + startUpCluster(true, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); @@ -703,9 +681,7 @@ public class TestLazyPersistFiles { public void testDnRestartWithUnsavedReplicas() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, - new StorageType[] {RAM_DISK, DEFAULT }, - (2 * BLOCK_SIZE - 1)); // 1 replica + delta. + startUpCluster(true, 1); stopLazyWriter(cluster.getDataNodes().get(0)); final String METHOD_NAME = GenericTestUtils.getMethodName(); @@ -727,9 +703,8 @@ public class TestLazyPersistFiles { * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially * capped. If ramDiskStorageLimit < 0 then it is ignored. */ - private void startUpCluster(final int numDataNodes, - final StorageType[] storageTypes, - final long ramDiskStorageLimit, + private void startUpCluster(boolean hasTransientStorage, + final int ramDiskReplicaCapacity, final boolean useSCR) throws IOException { @@ -739,42 +714,36 @@ public class TestLazyPersistFiles { LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - HEARTBEAT_RECHECK_INTERVAL_MSEC); + HEARTBEAT_RECHECK_INTERVAL_MSEC); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, - LAZY_WRITER_INTERVAL_SEC); + LAZY_WRITER_INTERVAL_SEC); + conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS, + EVICTION_LOW_WATERMARK); - conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR); + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR); - REPL_FACTOR = 1; //Reset in case a test has modified the value + long[] capacities = null; + if (hasTransientStorage && ramDiskReplicaCapacity >= 0) { + // Convert replica count to byte count, add some delta for .meta and VERSION files. + long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1); + capacities = new long[] { ramDiskStorageLimit, -1 }; + } cluster = new MiniDFSCluster .Builder(conf) - .numDataNodes(numDataNodes) - .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT }) + .numDataNodes(REPL_FACTOR) + .storageCapacities(capacities) + .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null) .build(); fs = cluster.getFileSystem(); client = fs.getClient(); - - // Artificially cap the storage capacity of the RAM_DISK volume. - if (ramDiskStorageLimit >= 0) { - List volumes = - cluster.getDataNodes().get(0).getFSDataset().getVolumes(); - - for (FsVolumeSpi volume : volumes) { - if (volume.getStorageType() == RAM_DISK) { - ((FsTransientVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit); - } - } - } - LOG.info("Cluster startup complete"); } - private void startUpCluster(final int numDataNodes, - final StorageType[] storageTypes, - final long ramDiskStorageLimit) + private void startUpCluster(boolean hasTransientStorage, + final int ramDiskReplicaCapacity) throws IOException { - startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false); + startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false); } private void makeTestFile(Path path, long length, final boolean isLazyPersist) @@ -908,17 +877,15 @@ public class TestLazyPersistFiles { class WriterRunnable implements Runnable { private final int id; - private final MiniDFSCluster cluster; private final Path paths[]; private final int seed; private CountDownLatch latch; private AtomicBoolean bFail; - public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths, + public WriterRunnable(int threadIndex, Path[] paths, int seed, CountDownLatch latch, AtomicBoolean bFail) { id = threadIndex; - this.cluster = cluster; this.paths = paths; this.seed = seed; this.latch = latch;