HDFS-8192. Eviction should key off used locked memory instead of ram disk free space. (Contributed by Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2015-06-20 13:27:52 -07:00
parent 658b5c84ae
commit c7d022b66f
10 changed files with 227 additions and 250 deletions

View File

@ -658,6 +658,9 @@ Release 2.8.0 - UNRELEASED
do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P. do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P.
McCabe) McCabe)
HDFS-8192. Eviction should key off used locked memory instead of
ram disk free space. (Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -94,10 +94,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker"; public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class; public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes";
public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT;
public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size"; public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;

View File

@ -404,6 +404,13 @@ long getOsPageSize() {
return usedBytesCount.rounder.osPageSize; return usedBytesCount.rounder.osPageSize;
} }
/**
* Round up to the OS page size.
*/
long roundUpPageSize(long count) {
return usedBytesCount.rounder.roundUp(count);
}
/** /**
* Background worker that mmaps, mlocks, and checksums a block * Background worker that mmaps, mlocks, and checksums a block
*/ */

View File

@ -1302,14 +1302,13 @@ public synchronized ReplicaHandler createRbw(
if (allowLazyPersist && if (allowLazyPersist &&
lazyWriter != null && lazyWriter != null &&
b.getNumBytes() % cacheManager.getOsPageSize() == 0 && b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
(cacheManager.reserve(b.getNumBytes())) > 0) { reserveLockedMemory(b.getNumBytes())) {
try { try {
// First try to place the block on a transient volume. // First try to place the block on a transient volume.
ref = volumes.getNextTransientVolume(b.getNumBytes()); ref = volumes.getNextTransientVolume(b.getNumBytes());
datanode.getMetrics().incrRamDiskBlocksWrite(); datanode.getMetrics().incrRamDiskBlocksWrite();
} catch(DiskOutOfSpaceException de) { } catch(DiskOutOfSpaceException de) {
// Ignore the exception since we just fall back to persistent storage. // Ignore the exception since we just fall back to persistent storage.
datanode.getMetrics().incrRamDiskBlocksWriteFallback();
} finally { } finally {
if (ref == null) { if (ref == null) {
cacheManager.release(b.getNumBytes()); cacheManager.release(b.getNumBytes());
@ -1323,6 +1322,11 @@ public synchronized ReplicaHandler createRbw(
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create an rbw file to hold block in the designated volume // create an rbw file to hold block in the designated volume
if (allowLazyPersist && !v.isTransientStorage()) {
datanode.getMetrics().incrRamDiskBlocksWriteFallback();
}
File f; File f;
try { try {
f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
@ -2833,20 +2837,11 @@ private void removeOldReplica(ReplicaInfo replicaInfo,
class LazyWriter implements Runnable { class LazyWriter implements Runnable {
private volatile boolean shouldRun = true; private volatile boolean shouldRun = true;
final int checkpointerInterval; final int checkpointerInterval;
final float lowWatermarkFreeSpacePercentage;
final long lowWatermarkFreeSpaceBytes;
public LazyWriter(Configuration conf) { public LazyWriter(Configuration conf) {
this.checkpointerInterval = conf.getInt( this.checkpointerInterval = conf.getInt(
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC); DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
this.lowWatermarkFreeSpacePercentage = conf.getFloat(
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
this.lowWatermarkFreeSpaceBytes = conf.getLong(
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT);
} }
/** /**
@ -2908,41 +2903,17 @@ private boolean saveNextReplica() {
return succeeded; return succeeded;
} }
private boolean transientFreeSpaceBelowThreshold() throws IOException {
long free = 0;
long capacity = 0;
float percentFree = 0.0f;
// Don't worry about fragmentation for now. We don't expect more than one
// transient volume per DN.
try (FsVolumeReferences volumes = getFsVolumeReferences()) {
for (FsVolumeSpi fvs : volumes) {
FsVolumeImpl v = (FsVolumeImpl) fvs;
if (v.isTransientStorage()) {
capacity += v.getCapacity();
free += v.getAvailable();
}
}
}
if (capacity == 0) {
return false;
}
percentFree = (float) ((double)free * 100 / capacity);
return (percentFree < lowWatermarkFreeSpacePercentage) ||
(free < lowWatermarkFreeSpaceBytes);
}
/** /**
* Attempt to evict one or more transient block replicas until we * Attempt to evict one or more transient block replicas until we
* have at least spaceNeeded bytes free. * have at least bytesNeeded bytes free.
*/ */
private void evictBlocks() throws IOException { public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0; int iterations = 0;
final long cacheCapacity = cacheManager.getCacheCapacity();
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
transientFreeSpaceBelowThreshold()) { (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction(); RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) { if (replicaState == null) {
@ -2959,7 +2930,8 @@ private void evictBlocks() throws IOException {
final String bpid = replicaState.getBlockPoolId(); final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) { synchronized (FsDatasetImpl.this) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
blockFile = replicaInfo.getBlockFile(); blockFile = replicaInfo.getBlockFile();
metaFile = replicaInfo.getMetaFile(); metaFile = replicaInfo.getMetaFile();
@ -2968,7 +2940,8 @@ private void evictBlocks() throws IOException {
ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(), ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
replicaState.getBlockId(), false); replicaState.getBlockId(), false);
// Move the replica from lazyPersist/ to finalized/ on target volume // Move the replica from lazyPersist/ to finalized/ on
// the target volume
BlockPoolSlice bpSlice = BlockPoolSlice bpSlice =
replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid); replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
File newBlockFile = bpSlice.activateSavedReplica( File newBlockFile = bpSlice.activateSavedReplica(
@ -2992,12 +2965,14 @@ private void evictBlocks() throws IOException {
if (replicaState.getNumReads() == 0) { if (replicaState.getNumReads() == 0) {
datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead(); datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
} }
}
// Delete the block+meta files from RAM disk and release locked
// memory.
removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile, removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
blockFileUsed, metaFileUsed, bpid); blockFileUsed, metaFileUsed, bpid);
} }
} }
}
@Override @Override
public void run() { public void run() {
@ -3006,7 +2981,6 @@ public void run() {
while (fsRunning && shouldRun) { while (fsRunning && shouldRun) {
try { try {
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
evictBlocks();
// Sleep if we have no more work to do or if it looks like we are not // Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist // making any forward progress. This is to ensure that if all persist
@ -3094,5 +3068,37 @@ void releaseLockedMemory(long count, boolean roundup) {
cacheManager.releaseRoundDown(count); cacheManager.releaseRoundDown(count);
} }
} }
/**
* Attempt to evict blocks from cache Manager to free the requested
* bytes.
*
* @param bytesNeeded
*/
@VisibleForTesting
public void evictLazyPersistBlocks(long bytesNeeded) {
try {
((LazyWriter) lazyWriter.getRunnable()).evictBlocks(bytesNeeded);
} catch(IOException ioe) {
LOG.info("Ignoring exception ", ioe);
}
}
/**
* Attempt to reserve the given amount of memory with the cache Manager.
* @param bytesNeeded
* @return
*/
boolean reserveLockedMemory(long bytesNeeded) {
if (cacheManager.reserve(bytesNeeded) > 0) {
return true;
}
// Round up bytes needed to osPageSize and attempt to evict
// one more more blocks to free up the reservation.
bytesNeeded = cacheManager.roundUpPageSize(bytesNeeded);
evictLazyPersistBlocks(bytesNeeded);
return cacheManager.reserve(bytesNeeded) > 0;
}
} }

View File

@ -130,7 +130,6 @@ static void initConfWithRamDisk(Configuration conf,
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
LazyPersistTestCase.initCacheManipulator(); LazyPersistTestCase.initCacheManipulator();
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
import org.apache.commons.lang.UnhandledException;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
@ -37,6 +39,7 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -55,6 +58,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -79,7 +83,6 @@ public abstract class LazyPersistTestCase {
protected static final int BLOCK_SIZE = 5 * 1024 * 1024; protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096; protected static final int BUFFER_LENGTH = 4096;
protected static final int EVICTION_LOW_WATERMARK = 1;
private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk"; private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
@ -236,7 +239,6 @@ protected final void startUpCluster(
StorageType[] storageTypes, StorageType[] storageTypes,
int ramDiskReplicaCapacity, int ramDiskReplicaCapacity,
long ramDiskStorageLimit, long ramDiskStorageLimit,
long evictionLowWatermarkReplicas,
long maxLockedMemory, long maxLockedMemory,
boolean useSCR, boolean useSCR,
boolean useLegacyBlockReaderLocal, boolean useLegacyBlockReaderLocal,
@ -256,8 +258,6 @@ protected final void startUpCluster(
HEARTBEAT_RECHECK_INTERVAL_MSEC); HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC); LAZY_WRITER_INTERVAL_SEC);
conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
evictionLowWatermarkReplicas * BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory); conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory);
@ -389,12 +389,6 @@ public ClusterWithRamDiskBuilder setUseLegacyBlockReaderLocal(
return this; return this;
} }
public ClusterWithRamDiskBuilder setEvictionLowWatermarkReplicas(
long evictionLowWatermarkReplicas) {
this.evictionLowWatermarkReplicas = evictionLowWatermarkReplicas;
return this;
}
public ClusterWithRamDiskBuilder disableScrubber() { public ClusterWithRamDiskBuilder disableScrubber() {
this.disableScrubber = true; this.disableScrubber = true;
return this; return this;
@ -403,8 +397,8 @@ public ClusterWithRamDiskBuilder disableScrubber() {
public void build() throws IOException { public void build() throws IOException {
LazyPersistTestCase.this.startUpCluster( LazyPersistTestCase.this.startUpCluster(
numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity, numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
ramDiskStorageLimit, evictionLowWatermarkReplicas, ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber); disableScrubber);
} }
private int numDatanodes = REPL_FACTOR; private int numDatanodes = REPL_FACTOR;
@ -415,7 +409,6 @@ public void build() throws IOException {
private boolean hasTransientStorage = true; private boolean hasTransientStorage = true;
private boolean useScr = false; private boolean useScr = false;
private boolean useLegacyBlockReaderLocal = false; private boolean useLegacyBlockReaderLocal = false;
private long evictionLowWatermarkReplicas = EVICTION_LOW_WATERMARK;
private boolean disableScrubber=false; private boolean disableScrubber=false;
} }
@ -513,4 +506,27 @@ private void printRamDiskJMXMetrics() {
e.printStackTrace(); e.printStackTrace();
} }
} }
protected void waitForMetric(final String metricName, final int expectedValue)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
final int currentValue = Integer.parseInt(jmx.getValue(metricName));
LOG.info("Waiting for " + metricName +
" to reach value " + expectedValue +
", current value = " + currentValue);
return currentValue == expectedValue;
} catch (Exception e) {
throw new UnhandledException("Test failed due to unexpected exception", e);
}
}
}, 1000, Integer.MAX_VALUE);
}
protected void triggerEviction(DataNode dn) {
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
}
} }

View File

@ -28,9 +28,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -103,25 +101,26 @@ public void testReleaseOnFileDeletion()
* Verify that locked RAM is released when blocks are evicted from RAM disk. * Verify that locked RAM is released when blocks are evicted from RAM disk.
*/ */
@Test @Test
public void testReleaseOnEviction() public void testReleaseOnEviction() throws Exception {
throws IOException, TimeoutException, InterruptedException {
getClusterBuilder().setNumDatanodes(1) getClusterBuilder().setNumDatanodes(1)
.setMaxLockedMemory(BLOCK_SIZE) .setMaxLockedMemory(BLOCK_SIZE)
.setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1) .setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1)
.build(); .build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); final FsDatasetImpl fsd =
(FsDatasetImpl) cluster.getDataNodes().get(0).getFSDataset();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path, BLOCK_SIZE, true); makeTestFile(path1, BLOCK_SIZE, true);
assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
// The block should get evicted soon since it pushes RAM disk free // Wait until the replica is written to persistent storage.
// space below the threshold. waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Trigger eviction and verify locked bytes were released.
fsd.evictLazyPersistBlocks(Long.MAX_VALUE);
verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
waitForLockedBytesUsed(fsd, 0); waitForLockedBytesUsed(fsd, 0);
MetricsRecordBuilder rb =
MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb);
} }
/** /**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -28,6 +29,8 @@
import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK; import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase { public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
@ -70,32 +73,50 @@ public void testFallbackToDisk() throws IOException {
ensureFileReplicasOnStorageType(path, DEFAULT); ensureFileReplicasOnStorageType(path, DEFAULT);
} }
@Test
public void testSynchronousEviction() throws Exception {
getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Wait until the replica is written to persistent storage.
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Ensure that writing a new file to RAM DISK evicts the block
// for the previous one.
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path2, BLOCK_SIZE, true);
verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
}
/** /**
* File can not fit in RamDisk even with eviction * File can not fit in RamDisk even with eviction
* @throws IOException * @throws IOException
*/ */
@Test @Test
public void testFallbackToDiskFull() throws Exception { public void testFallbackToDiskFull() throws Exception {
getClusterBuilder().setRamDiskReplicaCapacity(0).build(); getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE / 2).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true); makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT); ensureFileReplicasOnStorageType(path, DEFAULT);
verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1); verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
} }
/** /**
* File partially fit in RamDisk after eviction. * File partially fit in RamDisk after eviction.
* RamDisk can fit 2 blocks. Write a file with 5 blocks. * RamDisk can fit 2 blocks. Write a file with 5 blocks.
* Expect 2 or less blocks are on RamDisk and 3 or more on disk. * Expect 2 blocks are on RamDisk and rest on disk.
* @throws IOException * @throws IOException
*/ */
@Test @Test
public void testFallbackToDiskPartial() public void testFallbackToDiskPartial()
throws IOException, InterruptedException { throws IOException, InterruptedException {
getClusterBuilder().setRamDiskReplicaCapacity(2).build(); getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -122,8 +143,8 @@ public void testFallbackToDiskPartial()
// Since eviction is asynchronous, depending on the timing of eviction // Since eviction is asynchronous, depending on the timing of eviction
// wrt writes, we may get 2 or less blocks on RAM disk. // wrt writes, we may get 2 or less blocks on RAM disk.
assert(numBlocksOnRamDisk <= 2); assertThat(numBlocksOnRamDisk, is(2));
assert(numBlocksOnDisk >= 3); assertThat(numBlocksOnDisk, is(3));
} }
/** /**
@ -134,7 +155,8 @@ public void testFallbackToDiskPartial()
*/ */
@Test @Test
public void testRamDiskNotChosenByDefault() throws IOException { public void testRamDiskNotChosenByDefault() throws IOException {
getClusterBuilder().build(); getClusterBuilder().setStorageTypes(new StorageType[] {RAM_DISK, RAM_DISK})
.build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");

View File

@ -28,6 +28,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK; import static org.apache.hadoop.fs.StorageType.RAM_DISK;
@ -38,18 +39,16 @@
public class TestLazyWriter extends LazyPersistTestCase { public class TestLazyWriter extends LazyPersistTestCase {
@Test @Test
public void testLazyPersistBlocksAreSaved() public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().build(); getClusterBuilder().build();
final int NUM_BLOCKS = 10;
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
// Create a test file // Create a test file
makeTestFile(path, BLOCK_SIZE * 10, true); makeTestFile(path, BLOCK_SIZE * NUM_BLOCKS, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
waitForMetric("RamDiskBlocksLazyPersisted", NUM_BLOCKS);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
LOG.info("Verifying copy was saved to lazyPersist/"); LOG.info("Verifying copy was saved to lazyPersist/");
// Make sure that there is a saved copy of the replica on persistent // Make sure that there is a saved copy of the replica on persistent
@ -57,35 +56,22 @@ public void testLazyPersistBlocksAreSaved()
ensureLazyPersistBlocksAreSaved(locatedBlocks); ensureLazyPersistBlocksAreSaved(locatedBlocks);
} }
/**
* RamDisk eviction after lazy persist to disk.
* @throws Exception
*/
@Test @Test
public void testRamDiskEviction() throws Exception { public void testSynchronousEviction() throws Exception {
getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK).build(); getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Wait until the replica is written to persistent storage.
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Ensure that writing a new file to RAM DISK evicts the block
// for the previous one.
Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true); makeTestFile(path2, BLOCK_SIZE, true);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Ensure the first file was evicted to disk, the second is still on
// RAM_DISK.
ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT);
verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1); verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1); verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
} }
@ -98,8 +84,8 @@ public void testRamDiskEviction() throws Exception {
*/ */
@Test @Test
public void testRamDiskEvictionBeforePersist() public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException { throws Exception {
getClusterBuilder().setRamDiskReplicaCapacity(1).build(); getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@ -116,6 +102,7 @@ public void testRamDiskEvictionBeforePersist()
// Eviction should not happen for block of the first file that is not // Eviction should not happen for block of the first file that is not
// persisted yet. // persisted yet.
verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 0);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, DEFAULT); ensureFileReplicasOnStorageType(path2, DEFAULT);
@ -133,7 +120,7 @@ public void testRamDiskEvictionBeforePersist()
public void testRamDiskEvictionIsLru() public void testRamDiskEvictionIsLru()
throws Exception { throws Exception {
final int NUM_PATHS = 5; final int NUM_PATHS = 5;
getClusterBuilder().setRamDiskReplicaCapacity(NUM_PATHS + EVICTION_LOW_WATERMARK).build(); getClusterBuilder().setMaxLockedMemory(NUM_PATHS * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path paths[] = new Path[NUM_PATHS * 2]; Path paths[] = new Path[NUM_PATHS * 2];
@ -145,8 +132,7 @@ public void testRamDiskEvictionIsLru()
makeTestFile(paths[i], BLOCK_SIZE, true); makeTestFile(paths[i], BLOCK_SIZE, true);
} }
// Sleep for a short time to allow the lazy writer thread to do its job. waitForMetric("RamDiskBlocksLazyPersisted", NUM_PATHS);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
for (int i = 0; i < NUM_PATHS; ++i) { for (int i = 0; i < NUM_PATHS; ++i) {
ensureFileReplicasOnStorageType(paths[i], RAM_DISK); ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
@ -227,16 +213,13 @@ public void testDeleteAfterPersist()
makeTestFile(path, BLOCK_SIZE, true); makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Delete after persist // Delete after persist
client.delete(path.toString(), false); client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path)); Assert.assertFalse(fs.exists(path));
assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1); verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE); verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
} }
@ -248,7 +231,7 @@ public void testDeleteAfterPersist()
*/ */
@Test @Test
public void testDfsUsageCreateDelete() public void testDfsUsageCreateDelete()
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(4).build(); getClusterBuilder().setRamDiskReplicaCapacity(4).build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -261,8 +244,7 @@ public void testDfsUsageCreateDelete()
assertThat(usedAfterCreate, is((long) BLOCK_SIZE)); assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
// Sleep for a short time to allow the lazy writer thread to do its job waitForMetric("RamDiskBlocksLazyPersisted", 1);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
long usedAfterPersist = fs.getUsed(); long usedAfterPersist = fs.getUsed();
assertThat(usedAfterPersist, is((long) BLOCK_SIZE)); assertThat(usedAfterPersist, is((long) BLOCK_SIZE));

View File

@ -16,6 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -26,6 +27,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
@ -39,13 +41,20 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK; import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
/**
* Test Lazy persist behavior with short-circuit reads. These tests
* will be run on Linux only with Native IO enabled. The tests fake
* RAM_DISK storage using local disk.
*/
public class TestScrLazyPersistFiles extends LazyPersistTestCase { public class TestScrLazyPersistFiles extends LazyPersistTestCase {
@BeforeClass @BeforeClass
@ -58,6 +67,10 @@ public void before() {
Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS, Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
equalTo(true)); equalTo(true));
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
Preconditions.checkState(BLOCK_SIZE >= osPageSize);
Preconditions.checkState(BLOCK_SIZE % osPageSize == 0);
} }
@Rule @Rule
@ -69,35 +82,27 @@ public void before() {
*/ */
@Test @Test
public void testRamDiskShortCircuitRead() public void testRamDiskShortCircuitRead()
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setNumDatanodes(REPL_FACTOR) getClusterBuilder().setUseScr(true).build();
.setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
.setRamDiskStorageLimit(2 * BLOCK_SIZE - 1)
.setUseScr(true)
.build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED; final int SEED = 0xFADED;
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
// Create a file and wait till it is persisted.
makeRandomTestFile(path, BLOCK_SIZE, true, SEED); makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path, RAM_DISK); ensureFileReplicasOnStorageType(path, RAM_DISK);
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Sleep for a short time to allow the lazy writer thread to do its job HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
//assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true));
FSDataInputStream fis = fs.open(path);
// Verify SCR read counters // Verify SCR read counters
try { try {
fis = fs.open(path);
byte[] buf = new byte[BUFFER_LENGTH]; byte[] buf = new byte[BUFFER_LENGTH];
fis.read(0, buf, 0, BUFFER_LENGTH); fis.read(0, buf, 0, BUFFER_LENGTH);
HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
Assert.assertEquals(BUFFER_LENGTH, Assert.assertEquals(BUFFER_LENGTH,
dfsis.getReadStatistics().getTotalBytesRead()); fis.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(BUFFER_LENGTH, Assert.assertEquals(BUFFER_LENGTH,
dfsis.getReadStatistics().getTotalShortCircuitBytesRead()); fis.getReadStatistics().getTotalShortCircuitBytesRead());
} finally { } finally {
fis.close(); fis.close();
fis = null; fis = null;
@ -111,106 +116,77 @@ public void testRamDiskShortCircuitRead()
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test @Test
public void testRamDiskEvictionWithShortCircuitReadHandle() public void tesScrDuringEviction()
throws IOException, InterruptedException { throws Exception {
// 5 replica + delta, SCR. getClusterBuilder().setUseScr(true).build();
getClusterBuilder().setNumDatanodes(REPL_FACTOR)
.setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
.setRamDiskStorageLimit(6 * BLOCK_SIZE - 1)
.setEvictionLowWatermarkReplicas(3)
.setUseScr(true)
.build();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); // Create a file and wait till it is persisted.
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Sleep for a short time to allow the lazy writer thread to do its job. HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path1);
// However the block replica should not be evicted from RAM_DISK yet.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
// No eviction should happen as the free ratio is below the threshold
FSDataInputStream fis = fs.open(path1);
try { try {
// Keep and open read handle to path1 while creating path2 // Keep and open read handle to path1 while creating path2
byte[] buf = new byte[BUFFER_LENGTH]; byte[] buf = new byte[BUFFER_LENGTH];
fis.read(0, buf, 0, BUFFER_LENGTH); fis.read(0, buf, 0, BUFFER_LENGTH);
triggerEviction(cluster.getDataNodes().get(0));
// Create the 2nd file that will trigger RAM_DISK eviction.
makeTestFile(path2, BLOCK_SIZE * 2, true);
ensureFileReplicasOnStorageType(path2, RAM_DISK);
// Ensure path1 is still readable from the open SCR handle. // Ensure path1 is still readable from the open SCR handle.
fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH); fis.read(0, buf, 0, BUFFER_LENGTH);
HdfsDataInputStream dfsis = (HdfsDataInputStream) fis; assertThat(fis.getReadStatistics().getTotalBytesRead(),
Assert.assertEquals(2 * BUFFER_LENGTH, is((long) 2 * BUFFER_LENGTH));
dfsis.getReadStatistics().getTotalBytesRead()); assertThat(fis.getReadStatistics().getTotalShortCircuitBytesRead(),
Assert.assertEquals(2 * BUFFER_LENGTH, is((long) 2 * BUFFER_LENGTH));
dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
} finally { } finally {
IOUtils.closeQuietly(fis); IOUtils.closeQuietly(fis);
} }
// After the open handle is closed, path1 should be evicted to DISK.
triggerBlockReport();
ensureFileReplicasOnStorageType(path1, DEFAULT);
} }
@Test @Test
public void testShortCircuitReadAfterEviction() public void testScrAfterEviction()
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); getClusterBuilder().setUseScr(true)
getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
.setUseScr(true)
.setUseLegacyBlockReaderLocal(false) .setUseLegacyBlockReaderLocal(false)
.build(); .build();
doShortCircuitReadAfterEvictionTest(); doShortCircuitReadAfterEvictionTest();
} }
@Test @Test
public void testLegacyShortCircuitReadAfterEviction() public void testLegacyScrAfterEviction()
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) getClusterBuilder().setUseScr(true)
.setUseScr(true)
.setUseLegacyBlockReaderLocal(true) .setUseLegacyBlockReaderLocal(true)
.build(); .build();
doShortCircuitReadAfterEvictionTest(); doShortCircuitReadAfterEvictionTest();
// In the implementation of legacy short-circuit reads, any failure is
// trapped silently, reverts back to a remote read, and also disables all
// subsequent legacy short-circuit reads in the ClientContext.
// Assert that it didn't get disabled.
ClientContext clientContext = client.getClientContext();
Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
} }
private void doShortCircuitReadAfterEvictionTest() throws IOException, private void doShortCircuitReadAfterEvictionTest() throws IOException,
InterruptedException { InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED; final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Verify short-circuit read from RAM_DISK. // Verify short-circuit read from RAM_DISK.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
File metaFile = cluster.getBlockMetadataFile(0, File metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1)); DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize()); assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
// Sleep for a short time to allow the lazy writer thread to do its job. triggerEviction(cluster.getDataNodes().get(0));
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Verify short-circuit read from RAM_DISK once again.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Verify short-circuit read still works from DEFAULT storage. This time, // Verify short-circuit read still works from DEFAULT storage. This time,
// we'll have a checksum written during lazy persistence. // we'll have a checksum written during lazy persistence.
@ -219,54 +195,35 @@ private void doShortCircuitReadAfterEvictionTest() throws IOException,
DFSTestUtil.getFirstBlock(fs, path1)); DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize()); assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
// In the implementation of legacy short-circuit reads, any failure is
// trapped silently, reverts back to a remote read, and also disables all
// subsequent legacy short-circuit reads in the ClientContext. If the test
// uses legacy, then assert that it didn't get disabled.
ClientContext clientContext = client.getClientContext();
if (clientContext.getUseLegacyBlockReaderLocal()) {
Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
}
} }
@Test @Test
public void testShortCircuitReadBlockFileCorruption() throws IOException, public void testScrBlockFileCorruption() throws IOException,
InterruptedException { InterruptedException, TimeoutException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); getClusterBuilder().setUseScr(true)
getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
.setUseScr(true)
.setUseLegacyBlockReaderLocal(false) .setUseLegacyBlockReaderLocal(false)
.build(); .build();
doShortCircuitReadBlockFileCorruptionTest(); doShortCircuitReadBlockFileCorruptionTest();
} }
@Test @Test
public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException, public void testLegacyScrBlockFileCorruption() throws IOException,
InterruptedException { InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) getClusterBuilder().setUseScr(true)
.setUseScr(true)
.setUseLegacyBlockReaderLocal(true) .setUseLegacyBlockReaderLocal(true)
.build(); .build();
doShortCircuitReadBlockFileCorruptionTest(); doShortCircuitReadBlockFileCorruptionTest();
} }
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException, public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
InterruptedException { InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED; makeTestFile(path1, BLOCK_SIZE, true);
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Create another file with a replica on RAM_DISK, which evicts the first. triggerEviction(cluster.getDataNodes().get(0));
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted block file, and verify that checksum // Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it. // verification catches it.
@ -277,42 +234,32 @@ public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
} }
@Test @Test
public void testShortCircuitReadMetaFileCorruption() throws IOException, public void testScrMetaFileCorruption() throws IOException,
InterruptedException { InterruptedException, TimeoutException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); getClusterBuilder().setUseScr(true)
getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
.setUseScr(true)
.setUseLegacyBlockReaderLocal(false) .setUseLegacyBlockReaderLocal(false)
.build(); .build();
doShortCircuitReadMetaFileCorruptionTest(); doShortCircuitReadMetaFileCorruptionTest();
} }
@Test @Test
public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException, public void testLegacyScrMetaFileCorruption() throws IOException,
InterruptedException { InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK) getClusterBuilder().setUseScr(true)
.setUseScr(true)
.setUseLegacyBlockReaderLocal(true) .setUseLegacyBlockReaderLocal(true)
.build(); .build();
doShortCircuitReadMetaFileCorruptionTest(); doShortCircuitReadMetaFileCorruptionTest();
} }
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException, public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
InterruptedException { InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED; makeTestFile(path1, BLOCK_SIZE, true);
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Create another file with a replica on RAM_DISK, which evicts the first. triggerEviction(cluster.getDataNodes().get(0));
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted checksum file, and verify that checksum // Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it. // verification catches it.