From c7d022b66f0c5baafbb7000a435c1d6e39906efe Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Sat, 20 Jun 2015 13:27:52 -0700 Subject: [PATCH] HDFS-8192. Eviction should key off used locked memory instead of ram disk free space. (Contributed by Arpit Agarwal) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 - .../fsdataset/impl/FsDatasetCache.java | 7 + .../fsdataset/impl/FsDatasetImpl.java | 98 ++++----- .../hdfs/server/balancer/TestBalancer.java | 1 - .../fsdataset/impl/LazyPersistTestCase.java | 42 ++-- .../impl/TestLazyPersistLockedMemory.java | 25 ++- .../impl/TestLazyPersistReplicaPlacement.java | 36 +++- .../fsdataset/impl/TestLazyWriter.java | 68 +++--- .../impl/TestScrLazyPersistFiles.java | 193 +++++++----------- 10 files changed, 227 insertions(+), 250 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index aad3c258246..2e030b9aa3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -658,6 +658,9 @@ Release 2.8.0 - UNRELEASED do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P. McCabe) + HDFS-8192. Eviction should key off used locked memory instead of + ram disk free space. (Arpit Agarwal) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 5ce28632612..30540a9439d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -94,10 +94,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker"; public static final Class DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class; - public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent"; - public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f; - public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes"; - public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT; public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size"; public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 6f524b28907..f70d4afe297 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -404,6 +404,13 @@ public class FsDatasetCache { return usedBytesCount.rounder.osPageSize; } + /** + * Round up to the OS page size. + */ + long roundUpPageSize(long count) { + return usedBytesCount.rounder.roundUp(count); + } + /** * Background worker that mmaps, mlocks, and checksums a block */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8ebd2147e7e..a1ff918b054 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1302,14 +1302,13 @@ class FsDatasetImpl implements FsDatasetSpi { if (allowLazyPersist && lazyWriter != null && b.getNumBytes() % cacheManager.getOsPageSize() == 0 && - (cacheManager.reserve(b.getNumBytes())) > 0) { + reserveLockedMemory(b.getNumBytes())) { try { // First try to place the block on a transient volume. ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } catch(DiskOutOfSpaceException de) { // Ignore the exception since we just fall back to persistent storage. - datanode.getMetrics().incrRamDiskBlocksWriteFallback(); } finally { if (ref == null) { cacheManager.release(b.getNumBytes()); @@ -1323,6 +1322,11 @@ class FsDatasetImpl implements FsDatasetSpi { FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume + + if (allowLazyPersist && !v.isTransientStorage()) { + datanode.getMetrics().incrRamDiskBlocksWriteFallback(); + } + File f; try { f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); @@ -2833,20 +2837,11 @@ class FsDatasetImpl implements FsDatasetSpi { class LazyWriter implements Runnable { private volatile boolean shouldRun = true; final int checkpointerInterval; - final float lowWatermarkFreeSpacePercentage; - final long lowWatermarkFreeSpaceBytes; - public LazyWriter(Configuration conf) { this.checkpointerInterval = conf.getInt( DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC); - this.lowWatermarkFreeSpacePercentage = conf.getFloat( - DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT, - DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT); - this.lowWatermarkFreeSpaceBytes = conf.getLong( - DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, - DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT); } /** @@ -2908,41 +2903,17 @@ class FsDatasetImpl implements FsDatasetSpi { return succeeded; } - private boolean transientFreeSpaceBelowThreshold() throws IOException { - long free = 0; - long capacity = 0; - float percentFree = 0.0f; - - // Don't worry about fragmentation for now. We don't expect more than one - // transient volume per DN. - try (FsVolumeReferences volumes = getFsVolumeReferences()) { - for (FsVolumeSpi fvs : volumes) { - FsVolumeImpl v = (FsVolumeImpl) fvs; - if (v.isTransientStorage()) { - capacity += v.getCapacity(); - free += v.getAvailable(); - } - } - } - - if (capacity == 0) { - return false; - } - - percentFree = (float) ((double)free * 100 / capacity); - return (percentFree < lowWatermarkFreeSpacePercentage) || - (free < lowWatermarkFreeSpaceBytes); - } - /** * Attempt to evict one or more transient block replicas until we - * have at least spaceNeeded bytes free. + * have at least bytesNeeded bytes free. */ - private void evictBlocks() throws IOException { + public void evictBlocks(long bytesNeeded) throws IOException { int iterations = 0; + final long cacheCapacity = cacheManager.getCacheCapacity(); + while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && - transientFreeSpaceBelowThreshold()) { + (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) { RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction(); if (replicaState == null) { @@ -2959,7 +2930,8 @@ class FsDatasetImpl implements FsDatasetSpi { final String bpid = replicaState.getBlockPoolId(); synchronized (FsDatasetImpl.this) { - replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); + replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), + replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); blockFile = replicaInfo.getBlockFile(); metaFile = replicaInfo.getMetaFile(); @@ -2968,7 +2940,8 @@ class FsDatasetImpl implements FsDatasetSpi { ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(), replicaState.getBlockId(), false); - // Move the replica from lazyPersist/ to finalized/ on target volume + // Move the replica from lazyPersist/ to finalized/ on + // the target volume BlockPoolSlice bpSlice = replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid); File newBlockFile = bpSlice.activateSavedReplica( @@ -2992,10 +2965,12 @@ class FsDatasetImpl implements FsDatasetSpi { if (replicaState.getNumReads() == 0) { datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead(); } - } - removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile, - blockFileUsed, metaFileUsed, bpid); + // Delete the block+meta files from RAM disk and release locked + // memory. + removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile, + blockFileUsed, metaFileUsed, bpid); + } } } @@ -3006,7 +2981,6 @@ class FsDatasetImpl implements FsDatasetSpi { while (fsRunning && shouldRun) { try { numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); - evictBlocks(); // Sleep if we have no more work to do or if it looks like we are not // making any forward progress. This is to ensure that if all persist @@ -3094,5 +3068,37 @@ class FsDatasetImpl implements FsDatasetSpi { cacheManager.releaseRoundDown(count); } } + + /** + * Attempt to evict blocks from cache Manager to free the requested + * bytes. + * + * @param bytesNeeded + */ + @VisibleForTesting + public void evictLazyPersistBlocks(long bytesNeeded) { + try { + ((LazyWriter) lazyWriter.getRunnable()).evictBlocks(bytesNeeded); + } catch(IOException ioe) { + LOG.info("Ignoring exception ", ioe); + } + } + + /** + * Attempt to reserve the given amount of memory with the cache Manager. + * @param bytesNeeded + * @return + */ + boolean reserveLockedMemory(long bytesNeeded) { + if (cacheManager.reserve(bytesNeeded) > 0) { + return true; + } + + // Round up bytes needed to osPageSize and attempt to evict + // one more more blocks to free up the reservation. + bytesNeeded = cacheManager.roundUpPageSize(bytesNeeded); + evictLazyPersistBlocks(bytesNeeded); + return cacheManager.reserve(bytesNeeded) > 0; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 1f7bade5df0..e1ce1b3d6e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -130,7 +130,6 @@ public class TestBalancer { conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); - conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE); LazyPersistTestCase.initCacheManipulator(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java index 5ce5cc6e7fc..ce29fc89d87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.base.Supplier; +import org.apache.commons.lang.UnhandledException; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import static org.apache.hadoop.fs.CreateFlag.CREATE; @@ -37,6 +39,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeoutException; import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; @@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -79,7 +83,6 @@ public abstract class LazyPersistTestCase { protected static final int BLOCK_SIZE = 5 * 1024 * 1024; protected static final int BUFFER_LENGTH = 4096; - protected static final int EVICTION_LOW_WATERMARK = 1; private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; @@ -236,7 +239,6 @@ public abstract class LazyPersistTestCase { StorageType[] storageTypes, int ramDiskReplicaCapacity, long ramDiskStorageLimit, - long evictionLowWatermarkReplicas, long maxLockedMemory, boolean useSCR, boolean useLegacyBlockReaderLocal, @@ -256,8 +258,6 @@ public abstract class LazyPersistTestCase { HEARTBEAT_RECHECK_INTERVAL_MSEC); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, LAZY_WRITER_INTERVAL_SEC); - conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, - evictionLowWatermarkReplicas * BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1); conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory); @@ -389,12 +389,6 @@ public abstract class LazyPersistTestCase { return this; } - public ClusterWithRamDiskBuilder setEvictionLowWatermarkReplicas( - long evictionLowWatermarkReplicas) { - this.evictionLowWatermarkReplicas = evictionLowWatermarkReplicas; - return this; - } - public ClusterWithRamDiskBuilder disableScrubber() { this.disableScrubber = true; return this; @@ -403,8 +397,8 @@ public abstract class LazyPersistTestCase { public void build() throws IOException { LazyPersistTestCase.this.startUpCluster( numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity, - ramDiskStorageLimit, evictionLowWatermarkReplicas, - maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber); + ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal, + disableScrubber); } private int numDatanodes = REPL_FACTOR; @@ -415,7 +409,6 @@ public abstract class LazyPersistTestCase { private boolean hasTransientStorage = true; private boolean useScr = false; private boolean useLegacyBlockReaderLocal = false; - private long evictionLowWatermarkReplicas = EVICTION_LOW_WATERMARK; private boolean disableScrubber=false; } @@ -513,4 +506,27 @@ public abstract class LazyPersistTestCase { e.printStackTrace(); } } + + protected void waitForMetric(final String metricName, final int expectedValue) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + final int currentValue = Integer.parseInt(jmx.getValue(metricName)); + LOG.info("Waiting for " + metricName + + " to reach value " + expectedValue + + ", current value = " + currentValue); + return currentValue == expectedValue; + } catch (Exception e) { + throw new UnhandledException("Test failed due to unexpected exception", e); + } + } + }, 1000, Integer.MAX_VALUE); + } + + protected void triggerEviction(DataNode dn) { + FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset(); + fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle. + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java index 9ea4665fcb9..eef8f0bbf71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java @@ -28,9 +28,7 @@ import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.MetricsAsserts; import org.junit.Test; import java.io.IOException; @@ -103,25 +101,26 @@ public class TestLazyPersistLockedMemory extends LazyPersistTestCase { * Verify that locked RAM is released when blocks are evicted from RAM disk. */ @Test - public void testReleaseOnEviction() - throws IOException, TimeoutException, InterruptedException { + public void testReleaseOnEviction() throws Exception { getClusterBuilder().setNumDatanodes(1) .setMaxLockedMemory(BLOCK_SIZE) .setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1) .build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); - final FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + final FsDatasetImpl fsd = + (FsDatasetImpl) cluster.getDataNodes().get(0).getFSDataset(); - Path path = new Path("/" + METHOD_NAME + ".dat"); - makeTestFile(path, BLOCK_SIZE, true); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + makeTestFile(path1, BLOCK_SIZE, true); + assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE)); - // The block should get evicted soon since it pushes RAM disk free - // space below the threshold. + // Wait until the replica is written to persistent storage. + waitForMetric("RamDiskBlocksLazyPersisted", 1); + + // Trigger eviction and verify locked bytes were released. + fsd.evictLazyPersistBlocks(Long.MAX_VALUE); + verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1); waitForLockedBytesUsed(fsd, 0); - - MetricsRecordBuilder rb = - MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); - MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java index 018eabafe61..c89475aee8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.test.GenericTestUtils; @@ -28,6 +29,8 @@ import java.io.IOException; import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.RAM_DISK; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { @@ -70,32 +73,50 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { ensureFileReplicasOnStorageType(path, DEFAULT); } + @Test + public void testSynchronousEviction() throws Exception { + getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + + final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + makeTestFile(path1, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + + // Wait until the replica is written to persistent storage. + waitForMetric("RamDiskBlocksLazyPersisted", 1); + + // Ensure that writing a new file to RAM DISK evicts the block + // for the previous one. + Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + makeTestFile(path2, BLOCK_SIZE, true); + verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1); + } + /** * File can not fit in RamDisk even with eviction * @throws IOException */ @Test public void testFallbackToDiskFull() throws Exception { - getClusterBuilder().setRamDiskReplicaCapacity(0).build(); + getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE / 2).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); makeTestFile(path, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path, DEFAULT); - verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1); } /** * File partially fit in RamDisk after eviction. * RamDisk can fit 2 blocks. Write a file with 5 blocks. - * Expect 2 or less blocks are on RamDisk and 3 or more on disk. + * Expect 2 blocks are on RamDisk and rest on disk. * @throws IOException */ @Test public void testFallbackToDiskPartial() throws IOException, InterruptedException { - getClusterBuilder().setRamDiskReplicaCapacity(2).build(); + getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -122,8 +143,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { // Since eviction is asynchronous, depending on the timing of eviction // wrt writes, we may get 2 or less blocks on RAM disk. - assert(numBlocksOnRamDisk <= 2); - assert(numBlocksOnDisk >= 3); + assertThat(numBlocksOnRamDisk, is(2)); + assertThat(numBlocksOnDisk, is(3)); } /** @@ -134,7 +155,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { */ @Test public void testRamDiskNotChosenByDefault() throws IOException { - getClusterBuilder().build(); + getClusterBuilder().setStorageTypes(new StorageType[] {RAM_DISK, RAM_DISK}) + .build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java index ee8aaf08c36..6b16066e8c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java @@ -28,6 +28,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.concurrent.TimeoutException; import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.RAM_DISK; @@ -38,18 +39,16 @@ import static org.junit.Assert.assertTrue; public class TestLazyWriter extends LazyPersistTestCase { @Test public void testLazyPersistBlocksAreSaved() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { getClusterBuilder().build(); + final int NUM_BLOCKS = 10; final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); // Create a test file - makeTestFile(path, BLOCK_SIZE * 10, true); + makeTestFile(path, BLOCK_SIZE * NUM_BLOCKS, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); - - // Sleep for a short time to allow the lazy writer thread to do its job - Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); - + waitForMetric("RamDiskBlocksLazyPersisted", NUM_BLOCKS); LOG.info("Verifying copy was saved to lazyPersist/"); // Make sure that there is a saved copy of the replica on persistent @@ -57,35 +56,22 @@ public class TestLazyWriter extends LazyPersistTestCase { ensureLazyPersistBlocksAreSaved(locatedBlocks); } - /** - * RamDisk eviction after lazy persist to disk. - * @throws Exception - */ @Test - public void testRamDiskEviction() throws Exception { - getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK).build(); + public void testSynchronousEviction() throws Exception { + getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); - Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + + final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + makeTestFile(path1, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + + // Wait until the replica is written to persistent storage. + waitForMetric("RamDiskBlocksLazyPersisted", 1); + + // Ensure that writing a new file to RAM DISK evicts the block + // for the previous one. Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); - - final int SEED = 0xFADED; - makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); - ensureFileReplicasOnStorageType(path1, RAM_DISK); - - // Sleep for a short time to allow the lazy writer thread to do its job. - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - ensureFileReplicasOnStorageType(path1, RAM_DISK); - - // Create another file with a replica on RAM_DISK. makeTestFile(path2, BLOCK_SIZE, true); - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - triggerBlockReport(); - - // Ensure the first file was evicted to disk, the second is still on - // RAM_DISK. - ensureFileReplicasOnStorageType(path2, RAM_DISK); - ensureFileReplicasOnStorageType(path1, DEFAULT); - verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1); verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1); } @@ -98,8 +84,8 @@ public class TestLazyWriter extends LazyPersistTestCase { */ @Test public void testRamDiskEvictionBeforePersist() - throws IOException, InterruptedException { - getClusterBuilder().setRamDiskReplicaCapacity(1).build(); + throws Exception { + getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); @@ -116,6 +102,7 @@ public class TestLazyWriter extends LazyPersistTestCase { // Eviction should not happen for block of the first file that is not // persisted yet. + verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 0); ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path2, DEFAULT); @@ -133,7 +120,7 @@ public class TestLazyWriter extends LazyPersistTestCase { public void testRamDiskEvictionIsLru() throws Exception { final int NUM_PATHS = 5; - getClusterBuilder().setRamDiskReplicaCapacity(NUM_PATHS + EVICTION_LOW_WATERMARK).build(); + getClusterBuilder().setMaxLockedMemory(NUM_PATHS * BLOCK_SIZE).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path paths[] = new Path[NUM_PATHS * 2]; @@ -145,8 +132,7 @@ public class TestLazyWriter extends LazyPersistTestCase { makeTestFile(paths[i], BLOCK_SIZE, true); } - // Sleep for a short time to allow the lazy writer thread to do its job. - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + waitForMetric("RamDiskBlocksLazyPersisted", NUM_PATHS); for (int i = 0; i < NUM_PATHS; ++i) { ensureFileReplicasOnStorageType(paths[i], RAM_DISK); @@ -227,16 +213,13 @@ public class TestLazyWriter extends LazyPersistTestCase { makeTestFile(path, BLOCK_SIZE, true); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); - - // Sleep for a short time to allow the lazy writer thread to do its job - Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); + waitForMetric("RamDiskBlocksLazyPersisted", 1); // Delete after persist client.delete(path.toString(), false); Assert.assertFalse(fs.exists(path)); assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); - verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1); verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE); } @@ -248,7 +231,7 @@ public class TestLazyWriter extends LazyPersistTestCase { */ @Test public void testDfsUsageCreateDelete() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { getClusterBuilder().setRamDiskReplicaCapacity(4).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -261,8 +244,7 @@ public class TestLazyWriter extends LazyPersistTestCase { assertThat(usedAfterCreate, is((long) BLOCK_SIZE)); - // Sleep for a short time to allow the lazy writer thread to do its job - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + waitForMetric("RamDiskBlocksLazyPersisted", 1); long usedAfterPersist = fs.getUsed(); assertThat(usedAfterPersist, is((long) BLOCK_SIZE)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java index 7c7ba643697..25125884ff2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java @@ -16,6 +16,7 @@ * limitations under the License. */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; @@ -26,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.NativeCodeLoader; @@ -39,13 +41,20 @@ import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; +import java.util.concurrent.TimeoutException; import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.RAM_DISK; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +/** + * Test Lazy persist behavior with short-circuit reads. These tests + * will be run on Linux only with Native IO enabled. The tests fake + * RAM_DISK storage using local disk. + */ public class TestScrLazyPersistFiles extends LazyPersistTestCase { @BeforeClass @@ -58,6 +67,10 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase { Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS, equalTo(true)); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + + final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); + Preconditions.checkState(BLOCK_SIZE >= osPageSize); + Preconditions.checkState(BLOCK_SIZE % osPageSize == 0); } @Rule @@ -69,35 +82,27 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase { */ @Test public void testRamDiskShortCircuitRead() - throws IOException, InterruptedException { - getClusterBuilder().setNumDatanodes(REPL_FACTOR) - .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT}) - .setRamDiskStorageLimit(2 * BLOCK_SIZE - 1) - .setUseScr(true) - .build(); + throws IOException, InterruptedException, TimeoutException { + getClusterBuilder().setUseScr(true).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); final int SEED = 0xFADED; Path path = new Path("/" + METHOD_NAME + ".dat"); + // Create a file and wait till it is persisted. makeRandomTestFile(path, BLOCK_SIZE, true, SEED); ensureFileReplicasOnStorageType(path, RAM_DISK); + waitForMetric("RamDiskBlocksLazyPersisted", 1); - // Sleep for a short time to allow the lazy writer thread to do its job - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - - //assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true)); - FSDataInputStream fis = fs.open(path); + HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path); // Verify SCR read counters try { - fis = fs.open(path); byte[] buf = new byte[BUFFER_LENGTH]; fis.read(0, buf, 0, BUFFER_LENGTH); - HdfsDataInputStream dfsis = (HdfsDataInputStream) fis; Assert.assertEquals(BUFFER_LENGTH, - dfsis.getReadStatistics().getTotalBytesRead()); + fis.getReadStatistics().getTotalBytesRead()); Assert.assertEquals(BUFFER_LENGTH, - dfsis.getReadStatistics().getTotalShortCircuitBytesRead()); + fis.getReadStatistics().getTotalShortCircuitBytesRead()); } finally { fis.close(); fis = null; @@ -111,106 +116,77 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase { * @throws InterruptedException */ @Test - public void testRamDiskEvictionWithShortCircuitReadHandle() - throws IOException, InterruptedException { - // 5 replica + delta, SCR. - getClusterBuilder().setNumDatanodes(REPL_FACTOR) - .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT}) - .setRamDiskStorageLimit(6 * BLOCK_SIZE - 1) - .setEvictionLowWatermarkReplicas(3) - .setUseScr(true) - .build(); - + public void tesScrDuringEviction() + throws Exception { + getClusterBuilder().setUseScr(true).build(); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); - Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); - final int SEED = 0xFADED; - makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + // Create a file and wait till it is persisted. + makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); + waitForMetric("RamDiskBlocksLazyPersisted", 1); - // Sleep for a short time to allow the lazy writer thread to do its job. - // However the block replica should not be evicted from RAM_DISK yet. - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - - // No eviction should happen as the free ratio is below the threshold - FSDataInputStream fis = fs.open(path1); + HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path1); try { // Keep and open read handle to path1 while creating path2 byte[] buf = new byte[BUFFER_LENGTH]; fis.read(0, buf, 0, BUFFER_LENGTH); - - // Create the 2nd file that will trigger RAM_DISK eviction. - makeTestFile(path2, BLOCK_SIZE * 2, true); - ensureFileReplicasOnStorageType(path2, RAM_DISK); + triggerEviction(cluster.getDataNodes().get(0)); // Ensure path1 is still readable from the open SCR handle. - fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH); - HdfsDataInputStream dfsis = (HdfsDataInputStream) fis; - Assert.assertEquals(2 * BUFFER_LENGTH, - dfsis.getReadStatistics().getTotalBytesRead()); - Assert.assertEquals(2 * BUFFER_LENGTH, - dfsis.getReadStatistics().getTotalShortCircuitBytesRead()); + fis.read(0, buf, 0, BUFFER_LENGTH); + assertThat(fis.getReadStatistics().getTotalBytesRead(), + is((long) 2 * BUFFER_LENGTH)); + assertThat(fis.getReadStatistics().getTotalShortCircuitBytesRead(), + is((long) 2 * BUFFER_LENGTH)); } finally { IOUtils.closeQuietly(fis); } - - // After the open handle is closed, path1 should be evicted to DISK. - triggerBlockReport(); - ensureFileReplicasOnStorageType(path1, DEFAULT); } @Test - public void testShortCircuitReadAfterEviction() - throws IOException, InterruptedException { - Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); - getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) - .setUseScr(true) + public void testScrAfterEviction() + throws IOException, InterruptedException, TimeoutException { + getClusterBuilder().setUseScr(true) .setUseLegacyBlockReaderLocal(false) .build(); doShortCircuitReadAfterEvictionTest(); } @Test - public void testLegacyShortCircuitReadAfterEviction() - throws IOException, InterruptedException { - getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) - .setUseScr(true) + public void testLegacyScrAfterEviction() + throws IOException, InterruptedException, TimeoutException { + getClusterBuilder().setUseScr(true) .setUseLegacyBlockReaderLocal(true) .build(); doShortCircuitReadAfterEvictionTest(); + + // In the implementation of legacy short-circuit reads, any failure is + // trapped silently, reverts back to a remote read, and also disables all + // subsequent legacy short-circuit reads in the ClientContext. + // Assert that it didn't get disabled. + ClientContext clientContext = client.getClientContext(); + Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal()); } private void doShortCircuitReadAfterEvictionTest() throws IOException, - InterruptedException { + InterruptedException, TimeoutException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); - Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); final int SEED = 0xFADED; makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + waitForMetric("RamDiskBlocksLazyPersisted", 1); // Verify short-circuit read from RAM_DISK. - ensureFileReplicasOnStorageType(path1, RAM_DISK); File metaFile = cluster.getBlockMetadataFile(0, DFSTestUtil.getFirstBlock(fs, path1)); assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); - // Sleep for a short time to allow the lazy writer thread to do its job. - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - - // Verify short-circuit read from RAM_DISK once again. - ensureFileReplicasOnStorageType(path1, RAM_DISK); - metaFile = cluster.getBlockMetadataFile(0, - DFSTestUtil.getFirstBlock(fs, path1)); - assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); - assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); - - // Create another file with a replica on RAM_DISK, which evicts the first. - makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - triggerBlockReport(); + triggerEviction(cluster.getDataNodes().get(0)); // Verify short-circuit read still works from DEFAULT storage. This time, // we'll have a checksum written during lazy persistence. @@ -219,54 +195,35 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase { DFSTestUtil.getFirstBlock(fs, path1)); assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize()); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); - - // In the implementation of legacy short-circuit reads, any failure is - // trapped silently, reverts back to a remote read, and also disables all - // subsequent legacy short-circuit reads in the ClientContext. If the test - // uses legacy, then assert that it didn't get disabled. - ClientContext clientContext = client.getClientContext(); - if (clientContext.getUseLegacyBlockReaderLocal()) { - Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal()); - } } @Test - public void testShortCircuitReadBlockFileCorruption() throws IOException, - InterruptedException { - Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); - getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) - .setUseScr(true) + public void testScrBlockFileCorruption() throws IOException, + InterruptedException, TimeoutException { + getClusterBuilder().setUseScr(true) .setUseLegacyBlockReaderLocal(false) .build(); doShortCircuitReadBlockFileCorruptionTest(); } @Test - public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException, - InterruptedException { - getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) - .setUseScr(true) + public void testLegacyScrBlockFileCorruption() throws IOException, + InterruptedException, TimeoutException { + getClusterBuilder().setUseScr(true) .setUseLegacyBlockReaderLocal(true) .build(); doShortCircuitReadBlockFileCorruptionTest(); } public void doShortCircuitReadBlockFileCorruptionTest() throws IOException, - InterruptedException { + InterruptedException, TimeoutException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); - Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); - final int SEED = 0xFADED; - makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - - // Create another file with a replica on RAM_DISK, which evicts the first. - makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); - - // Sleep for a short time to allow the lazy writer thread to do its job. - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - triggerBlockReport(); + waitForMetric("RamDiskBlocksLazyPersisted", 1); + triggerEviction(cluster.getDataNodes().get(0)); // Corrupt the lazy-persisted block file, and verify that checksum // verification catches it. @@ -277,42 +234,32 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase { } @Test - public void testShortCircuitReadMetaFileCorruption() throws IOException, - InterruptedException { - Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); - getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) - .setUseScr(true) + public void testScrMetaFileCorruption() throws IOException, + InterruptedException, TimeoutException { + getClusterBuilder().setUseScr(true) .setUseLegacyBlockReaderLocal(false) .build(); doShortCircuitReadMetaFileCorruptionTest(); } @Test - public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException, - InterruptedException { - getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) - .setUseScr(true) + public void testLegacyScrMetaFileCorruption() throws IOException, + InterruptedException, TimeoutException { + getClusterBuilder().setUseScr(true) .setUseLegacyBlockReaderLocal(true) .build(); doShortCircuitReadMetaFileCorruptionTest(); } public void doShortCircuitReadMetaFileCorruptionTest() throws IOException, - InterruptedException { + InterruptedException, TimeoutException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); - Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); - final int SEED = 0xFADED; - makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - - // Create another file with a replica on RAM_DISK, which evicts the first. - makeRandomTestFile(path2, BLOCK_SIZE, true, SEED); - - // Sleep for a short time to allow the lazy writer thread to do its job. - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); - triggerBlockReport(); + waitForMetric("RamDiskBlocksLazyPersisted", 1); + triggerEviction(cluster.getDataNodes().get(0)); // Corrupt the lazy-persisted checksum file, and verify that checksum // verification catches it.