HDFS-5511. improve CacheManipulator interface to allow better unit testing (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543676 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2f6b393543
commit
efea68dc35
|
@ -203,8 +203,8 @@ public class ReadaheadPool {
|
||||||
// It's also possible that we'll end up requesting readahead on some
|
// It's also possible that we'll end up requesting readahead on some
|
||||||
// other FD, which may be wasted work, but won't cause a problem.
|
// other FD, which may be wasted work, but won't cause a problem.
|
||||||
try {
|
try {
|
||||||
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, off, len,
|
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
|
||||||
NativeIO.POSIX.POSIX_FADV_WILLNEED);
|
fd, off, len, NativeIO.POSIX.POSIX_FADV_WILLNEED);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (canceled) {
|
if (canceled) {
|
||||||
// no big deal - the reader canceled the request and closed
|
// no big deal - the reader canceled the request and closed
|
||||||
|
|
|
@ -98,9 +98,6 @@ public class NativeIO {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(NativeIO.class);
|
private static final Log LOG = LogFactory.getLog(NativeIO.class);
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public static CacheTracker cacheTracker = null;
|
|
||||||
|
|
||||||
private static boolean nativeLoaded = false;
|
private static boolean nativeLoaded = false;
|
||||||
private static boolean fadvisePossible = true;
|
private static boolean fadvisePossible = true;
|
||||||
private static boolean syncFileRangePossible = true;
|
private static boolean syncFileRangePossible = true;
|
||||||
|
@ -111,18 +108,61 @@ public class NativeIO {
|
||||||
|
|
||||||
private static long cacheTimeout = -1;
|
private static long cacheTimeout = -1;
|
||||||
|
|
||||||
public static interface CacheTracker {
|
private static CacheManipulator cacheManipulator = new CacheManipulator();
|
||||||
public void fadvise(String identifier, long offset, long len, int flags);
|
|
||||||
|
public static CacheManipulator getCacheManipulator() {
|
||||||
|
return cacheManipulator;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CacheManipulator cacheManipulator = new CacheManipulator();
|
public static void setCacheManipulator(CacheManipulator cacheManipulator) {
|
||||||
|
POSIX.cacheManipulator = cacheManipulator;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to manipulate the operating system cache.
|
||||||
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static class CacheManipulator {
|
public static class CacheManipulator {
|
||||||
public void mlock(String identifier, ByteBuffer buffer,
|
public void mlock(String identifier, ByteBuffer buffer,
|
||||||
long len) throws IOException {
|
long len) throws IOException {
|
||||||
POSIX.mlock(buffer, len);
|
POSIX.mlock(buffer, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getMemlockLimit() {
|
||||||
|
return NativeIO.getMemlockLimit();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOperatingSystemPageSize() {
|
||||||
|
return NativeIO.getOperatingSystemPageSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void posixFadviseIfPossible(String identifier,
|
||||||
|
FileDescriptor fd, long offset, long len, int flags)
|
||||||
|
throws NativeIOException {
|
||||||
|
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
|
||||||
|
len, flags);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A CacheManipulator used for testing which does not actually call mlock.
|
||||||
|
* This allows many tests to be run even when the operating system does not
|
||||||
|
* allow mlock, or only allows limited mlocking.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static class NoMlockCacheManipulator extends CacheManipulator {
|
||||||
|
public void mlock(String identifier, ByteBuffer buffer,
|
||||||
|
long len) throws IOException {
|
||||||
|
LOG.info("mlocking " + identifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMemlockLimit() {
|
||||||
|
return 1125899906842624L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOperatingSystemPageSize() {
|
||||||
|
return 4096;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -207,12 +247,9 @@ public class NativeIO {
|
||||||
*
|
*
|
||||||
* @throws NativeIOException if there is an error with the syscall
|
* @throws NativeIOException if there is an error with the syscall
|
||||||
*/
|
*/
|
||||||
public static void posixFadviseIfPossible(String identifier,
|
static void posixFadviseIfPossible(String identifier,
|
||||||
FileDescriptor fd, long offset, long len, int flags)
|
FileDescriptor fd, long offset, long len, int flags)
|
||||||
throws NativeIOException {
|
throws NativeIOException {
|
||||||
if (cacheTracker != null) {
|
|
||||||
cacheTracker.fadvise(identifier, offset, len, flags);
|
|
||||||
}
|
|
||||||
if (nativeLoaded && fadvisePossible) {
|
if (nativeLoaded && fadvisePossible) {
|
||||||
try {
|
try {
|
||||||
posix_fadvise(fd, offset, len, flags);
|
posix_fadvise(fd, offset, len, flags);
|
||||||
|
@ -566,7 +603,7 @@ public class NativeIO {
|
||||||
* Long.MAX_VALUE if there is no limit;
|
* Long.MAX_VALUE if there is no limit;
|
||||||
* The number of bytes that can be locked into memory otherwise.
|
* The number of bytes that can be locked into memory otherwise.
|
||||||
*/
|
*/
|
||||||
public static long getMemlockLimit() {
|
static long getMemlockLimit() {
|
||||||
return isAvailable() ? getMemlockLimit0() : 0;
|
return isAvailable() ? getMemlockLimit0() : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,7 +612,7 @@ public class NativeIO {
|
||||||
/**
|
/**
|
||||||
* @return the operating system's page size.
|
* @return the operating system's page size.
|
||||||
*/
|
*/
|
||||||
public static long getOperatingSystemPageSize() {
|
static long getOperatingSystemPageSize() {
|
||||||
try {
|
try {
|
||||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||||
f.setAccessible(true);
|
f.setAccessible(true);
|
||||||
|
|
|
@ -199,6 +199,9 @@ Trunk (Unreleased)
|
||||||
|
|
||||||
HDFS-5366. recaching improvements (cmccabe)
|
HDFS-5366. recaching improvements (cmccabe)
|
||||||
|
|
||||||
|
HDFS-5511. improve CacheManipulator interface to allow better unit testing
|
||||||
|
(cmccabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
|
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
|
||||||
|
|
||||||
|
|
|
@ -657,8 +657,9 @@ class BlockReceiver implements Closeable {
|
||||||
//
|
//
|
||||||
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
|
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
|
||||||
if (dropPos > 0 && dropCacheBehindWrites) {
|
if (dropPos > 0 && dropCacheBehindWrites) {
|
||||||
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
|
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
||||||
outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
block.getBlockName(), outFd, 0, dropPos,
|
||||||
|
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
||||||
}
|
}
|
||||||
lastCacheManagementOffset = offsetInBlock;
|
lastCacheManagementOffset = offsetInBlock;
|
||||||
}
|
}
|
||||||
|
|
|
@ -375,8 +375,9 @@ class BlockSender implements java.io.Closeable {
|
||||||
((dropCacheBehindAllReads) ||
|
((dropCacheBehindAllReads) ||
|
||||||
(dropCacheBehindLargeReads && isLongRead()))) {
|
(dropCacheBehindLargeReads && isLongRead()))) {
|
||||||
try {
|
try {
|
||||||
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
|
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
||||||
blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
|
block.getBlockName(), blockInFd, lastCacheDropOffset,
|
||||||
|
offset - lastCacheDropOffset,
|
||||||
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Unable to drop cache on file close", e);
|
LOG.warn("Unable to drop cache on file close", e);
|
||||||
|
@ -674,8 +675,9 @@ class BlockSender implements java.io.Closeable {
|
||||||
|
|
||||||
if (isLongRead() && blockInFd != null) {
|
if (isLongRead() && blockInFd != null) {
|
||||||
// Advise that this file descriptor will be accessed sequentially.
|
// Advise that this file descriptor will be accessed sequentially.
|
||||||
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
|
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
||||||
blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
|
block.getBlockName(), blockInFd, 0, 0,
|
||||||
|
NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger readahead of beginning of file if configured.
|
// Trigger readahead of beginning of file if configured.
|
||||||
|
@ -761,9 +763,9 @@ class BlockSender implements java.io.Closeable {
|
||||||
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
|
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
|
||||||
if (offset >= nextCacheDropOffset) {
|
if (offset >= nextCacheDropOffset) {
|
||||||
long dropLength = offset - lastCacheDropOffset;
|
long dropLength = offset - lastCacheDropOffset;
|
||||||
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
|
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
|
||||||
blockInFd, lastCacheDropOffset, dropLength,
|
block.getBlockName(), blockInFd, lastCacheDropOffset,
|
||||||
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
||||||
lastCacheDropOffset = offset;
|
lastCacheDropOffset = offset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -667,7 +667,7 @@ public class DataNode extends Configured
|
||||||
" size (%s) is greater than zero and native code is not available.",
|
" size (%s) is greater than zero and native code is not available.",
|
||||||
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
|
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
|
||||||
}
|
}
|
||||||
long ulimit = NativeIO.getMemlockLimit();
|
long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
|
||||||
if (dnConf.maxLockedMemory > ulimit) {
|
if (dnConf.maxLockedMemory > ulimit) {
|
||||||
throw new RuntimeException(String.format(
|
throw new RuntimeException(String.format(
|
||||||
"Cannot start datanode because the configured max locked memory" +
|
"Cannot start datanode because the configured max locked memory" +
|
||||||
|
|
|
@ -163,7 +163,8 @@ public class FsDatasetCache {
|
||||||
private final UsedBytesCount usedBytesCount;
|
private final UsedBytesCount usedBytesCount;
|
||||||
|
|
||||||
public static class PageRounder {
|
public static class PageRounder {
|
||||||
private final long osPageSize = NativeIO.getOperatingSystemPageSize();
|
private final long osPageSize =
|
||||||
|
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Round up a number to the operating system page size.
|
* Round up a number to the operating system page size.
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class MappableBlock implements Closeable {
|
||||||
throw new IOException("Block InputStream has no FileChannel.");
|
throw new IOException("Block InputStream has no FileChannel.");
|
||||||
}
|
}
|
||||||
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
|
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
|
||||||
NativeIO.POSIX.cacheManipulator.mlock(blockFileName, mmap, length);
|
NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
|
||||||
verifyChecksum(length, metaIn, blockChannel, blockFileName);
|
verifyChecksum(length, metaIn, blockChannel, blockFileName);
|
||||||
mappableBlock = new MappableBlock(mmap, length);
|
mappableBlock = new MappableBlock(mmap, length);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -113,7 +113,8 @@ public class TestDatanodeConfig {
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testMemlockLimit() throws Exception {
|
public void testMemlockLimit() throws Exception {
|
||||||
assumeTrue(NativeIO.isAvailable());
|
assumeTrue(NativeIO.isAvailable());
|
||||||
final long memlockLimit = NativeIO.getMemlockLimit();
|
final long memlockLimit =
|
||||||
|
NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
|
||||||
|
|
||||||
// Can't increase the memlock limit past the maximum.
|
// Can't increase the memlock limit past the maximum.
|
||||||
assumeTrue(memlockLimit != Long.MAX_VALUE);
|
assumeTrue(memlockLimit != Long.MAX_VALUE);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.io.FileDescriptor;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -36,7 +37,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIOException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -54,7 +56,7 @@ public class TestCachingStrategy {
|
||||||
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
|
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
|
||||||
|
|
||||||
// Track calls to posix_fadvise.
|
// Track calls to posix_fadvise.
|
||||||
NativeIO.POSIX.cacheTracker = tracker;
|
NativeIO.POSIX.setCacheManipulator(tracker);
|
||||||
|
|
||||||
// Normally, we wait for a few megabytes of data to be read or written
|
// Normally, we wait for a few megabytes of data to be read or written
|
||||||
// before dropping the cache. This is to avoid an excessive number of
|
// before dropping the cache. This is to avoid an excessive number of
|
||||||
|
@ -106,12 +108,13 @@ public class TestCachingStrategy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestRecordingCacheTracker implements CacheTracker {
|
private static class TestRecordingCacheTracker extends CacheManipulator {
|
||||||
private final Map<String, Stats> map = new TreeMap<String, Stats>();
|
private final Map<String, Stats> map = new TreeMap<String, Stats>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
synchronized public void fadvise(String name,
|
public void posixFadviseIfPossible(String name,
|
||||||
long offset, long len, int flags) {
|
FileDescriptor fd, long offset, long len, int flags)
|
||||||
|
throws NativeIOException {
|
||||||
if ((len < 0) || (len > Integer.MAX_VALUE)) {
|
if ((len < 0) || (len > Integer.MAX_VALUE)) {
|
||||||
throw new RuntimeException("invalid length of " + len +
|
throw new RuntimeException("invalid length of " + len +
|
||||||
" passed to posixFadviseIfPossible");
|
" passed to posixFadviseIfPossible");
|
||||||
|
@ -126,6 +129,7 @@ public class TestCachingStrategy {
|
||||||
map.put(name, stats);
|
map.put(name, stats);
|
||||||
}
|
}
|
||||||
stats.fadvise((int)offset, (int)len, flags);
|
stats.fadvise((int)offset, (int)len, flags);
|
||||||
|
super.posixFadviseIfPossible(name, fd, offset, len, flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void clear() {
|
synchronized void clear() {
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.MetricsAsserts;
|
import org.apache.hadoop.test.MetricsAsserts;
|
||||||
|
@ -99,7 +100,6 @@ public class TestFsDatasetCache {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
assumeTrue(!Path.WINDOWS);
|
assumeTrue(!Path.WINDOWS);
|
||||||
assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY);
|
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
|
||||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
|
||||||
|
@ -122,18 +122,8 @@ public class TestFsDatasetCache {
|
||||||
|
|
||||||
spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
|
spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
|
||||||
|
|
||||||
prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
|
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
|
||||||
|
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
|
||||||
// Save the current CacheManipulator and replace it at the end of the test
|
|
||||||
// Stub out mlock calls to avoid failing when not enough memory is lockable
|
|
||||||
// by the operating system.
|
|
||||||
NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
|
|
||||||
@Override
|
|
||||||
public void mlock(String identifier,
|
|
||||||
ByteBuffer mmap, long length) throws IOException {
|
|
||||||
LOG.info("mlocking " + identifier);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -145,7 +135,7 @@ public class TestFsDatasetCache {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
// Restore the original CacheManipulator
|
// Restore the original CacheManipulator
|
||||||
NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
|
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setHeartbeatResponse(DatanodeCommand[] cmds)
|
private static void setHeartbeatResponse(DatanodeCommand[] cmds)
|
||||||
|
@ -222,7 +212,8 @@ public class TestFsDatasetCache {
|
||||||
if (tries++ > 10) {
|
if (tries++ > 10) {
|
||||||
LOG.info("verifyExpectedCacheUsage: expected " +
|
LOG.info("verifyExpectedCacheUsage: expected " +
|
||||||
expected + ", got " + curDnCacheUsed + "; " +
|
expected + ", got " + curDnCacheUsed + "; " +
|
||||||
"memlock limit = " + NativeIO.getMemlockLimit() +
|
"memlock limit = " +
|
||||||
|
NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
|
||||||
". Waiting...");
|
". Waiting...");
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -297,40 +288,31 @@ public class TestFsDatasetCache {
|
||||||
*/
|
*/
|
||||||
@Test(timeout=600000)
|
@Test(timeout=600000)
|
||||||
public void testCacheAndUncacheBlockWithRetries() throws Exception {
|
public void testCacheAndUncacheBlockWithRetries() throws Exception {
|
||||||
CacheManipulator prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
|
// We don't have to save the previous cacheManipulator
|
||||||
|
// because it will be reinstalled by the @After function.
|
||||||
|
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
|
||||||
|
private final Set<String> seenIdentifiers = new HashSet<String>();
|
||||||
|
|
||||||
try {
|
@Override
|
||||||
NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
|
public void mlock(String identifier,
|
||||||
private final Set<String> seenIdentifiers = new HashSet<String>();
|
ByteBuffer mmap, long length) throws IOException {
|
||||||
|
if (seenIdentifiers.contains(identifier)) {
|
||||||
@Override
|
// mlock succeeds the second time.
|
||||||
public void mlock(String identifier,
|
LOG.info("mlocking " + identifier);
|
||||||
ByteBuffer mmap, long length) throws IOException {
|
return;
|
||||||
if (seenIdentifiers.contains(identifier)) {
|
|
||||||
// mlock succeeds the second time.
|
|
||||||
LOG.info("mlocking " + identifier);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
seenIdentifiers.add(identifier);
|
|
||||||
throw new IOException("injecting IOException during mlock of " +
|
|
||||||
identifier);
|
|
||||||
}
|
}
|
||||||
};
|
seenIdentifiers.add(identifier);
|
||||||
testCacheAndUncacheBlock();
|
throw new IOException("injecting IOException during mlock of " +
|
||||||
} finally {
|
identifier);
|
||||||
NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
|
}
|
||||||
}
|
});
|
||||||
|
testCacheAndUncacheBlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=600000)
|
@Test(timeout=600000)
|
||||||
public void testFilesExceedMaxLockedMemory() throws Exception {
|
public void testFilesExceedMaxLockedMemory() throws Exception {
|
||||||
LOG.info("beginning testFilesExceedMaxLockedMemory");
|
LOG.info("beginning testFilesExceedMaxLockedMemory");
|
||||||
|
|
||||||
// We don't want to deal with page rounding issues, so skip this
|
|
||||||
// test if page size is weird
|
|
||||||
long osPageSize = NativeIO.getOperatingSystemPageSize();
|
|
||||||
assumeTrue(osPageSize == 4096);
|
|
||||||
|
|
||||||
// Create some test files that will exceed total cache capacity
|
// Create some test files that will exceed total cache capacity
|
||||||
final int numFiles = 5;
|
final int numFiles = 5;
|
||||||
final long fileSize = 15000;
|
final long fileSize = 15000;
|
||||||
|
@ -411,7 +393,7 @@ public class TestFsDatasetCache {
|
||||||
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
|
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
|
||||||
assertEquals("Unexpected amount of cache used", current, cacheUsed);
|
assertEquals("Unexpected amount of cache used", current, cacheUsed);
|
||||||
|
|
||||||
NativeIO.POSIX.cacheManipulator = new NativeIO.POSIX.CacheManipulator() {
|
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
|
||||||
@Override
|
@Override
|
||||||
public void mlock(String identifier,
|
public void mlock(String identifier,
|
||||||
ByteBuffer mmap, long length) throws IOException {
|
ByteBuffer mmap, long length) throws IOException {
|
||||||
|
@ -422,7 +404,7 @@ public class TestFsDatasetCache {
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
});
|
||||||
// Starting caching each block in succession. The usedBytes amount
|
// Starting caching each block in succession. The usedBytes amount
|
||||||
// should increase, even though caching doesn't complete on any of them.
|
// should increase, even though caching doesn't complete on any of them.
|
||||||
for (int i=0; i<NUM_BLOCKS; i++) {
|
for (int i=0; i<NUM_BLOCKS; i++) {
|
||||||
|
|
|
@ -61,12 +61,12 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.GSet;
|
import org.apache.hadoop.util.GSet;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assume;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -98,18 +98,8 @@ public class TestPathBasedCacheRequests {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
dfs = cluster.getFileSystem();
|
dfs = cluster.getFileSystem();
|
||||||
proto = cluster.getNameNodeRpc();
|
proto = cluster.getNameNodeRpc();
|
||||||
prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
|
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
|
||||||
|
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
|
||||||
// Save the current CacheManipulator and replace it at the end of the test
|
|
||||||
// Stub out mlock calls to avoid failing when not enough memory is lockable
|
|
||||||
// by the operating system.
|
|
||||||
NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
|
|
||||||
@Override
|
|
||||||
public void mlock(String identifier,
|
|
||||||
ByteBuffer mmap, long length) throws IOException {
|
|
||||||
LOG.info("mlocking " + identifier);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -118,7 +108,7 @@ public class TestPathBasedCacheRequests {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
// Restore the original CacheManipulator
|
// Restore the original CacheManipulator
|
||||||
NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
|
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
|
@ -654,20 +644,6 @@ public class TestPathBasedCacheRequests {
|
||||||
// Most Linux installs will allow non-root users to lock 64KB.
|
// Most Linux installs will allow non-root users to lock 64KB.
|
||||||
private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
|
private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true if we can test DN caching.
|
|
||||||
*/
|
|
||||||
private static boolean canTestDatanodeCaching() {
|
|
||||||
if (!NativeIO.isAvailable()) {
|
|
||||||
// Need NativeIO in order to cache blocks on the DN.
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (NativeIO.getMemlockLimit() < CACHE_CAPACITY) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static HdfsConfiguration createCachingConf() {
|
private static HdfsConfiguration createCachingConf() {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
@ -681,7 +657,6 @@ public class TestPathBasedCacheRequests {
|
||||||
|
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testWaitForCachedReplicas() throws Exception {
|
public void testWaitForCachedReplicas() throws Exception {
|
||||||
Assume.assumeTrue(canTestDatanodeCaching());
|
|
||||||
HdfsConfiguration conf = createCachingConf();
|
HdfsConfiguration conf = createCachingConf();
|
||||||
FileSystemTestHelper helper = new FileSystemTestHelper();
|
FileSystemTestHelper helper = new FileSystemTestHelper();
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
|
@ -739,7 +714,6 @@ public class TestPathBasedCacheRequests {
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
|
public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Assume.assumeTrue(canTestDatanodeCaching());
|
|
||||||
HdfsConfiguration conf = createCachingConf();
|
HdfsConfiguration conf = createCachingConf();
|
||||||
conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
|
conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
|
@ -787,7 +761,6 @@ public class TestPathBasedCacheRequests {
|
||||||
|
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testWaitForCachedReplicasInDirectory() throws Exception {
|
public void testWaitForCachedReplicasInDirectory() throws Exception {
|
||||||
Assume.assumeTrue(canTestDatanodeCaching());
|
|
||||||
HdfsConfiguration conf = createCachingConf();
|
HdfsConfiguration conf = createCachingConf();
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
||||||
|
@ -839,7 +812,6 @@ public class TestPathBasedCacheRequests {
|
||||||
*/
|
*/
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testReplicationFactor() throws Exception {
|
public void testReplicationFactor() throws Exception {
|
||||||
Assume.assumeTrue(canTestDatanodeCaching());
|
|
||||||
HdfsConfiguration conf = createCachingConf();
|
HdfsConfiguration conf = createCachingConf();
|
||||||
MiniDFSCluster cluster =
|
MiniDFSCluster cluster =
|
||||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class FadvisedChunkedFile extends ChunkedFile {
|
||||||
}
|
}
|
||||||
if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
|
if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
|
||||||
try {
|
try {
|
||||||
NativeIO.POSIX.posixFadviseIfPossible(identifier,
|
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
|
||||||
fd,
|
fd,
|
||||||
getStartOffset(), getEndOffset() - getStartOffset(),
|
getStartOffset(), getEndOffset() - getStartOffset(),
|
||||||
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
|
||||||
public void transferSuccessful() {
|
public void transferSuccessful() {
|
||||||
if (manageOsCache && getCount() > 0) {
|
if (manageOsCache && getCount() > 0) {
|
||||||
try {
|
try {
|
||||||
NativeIO.POSIX.posixFadviseIfPossible(identifier,
|
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
|
||||||
fd, getPosition(), getCount(),
|
fd, getPosition(), getCount(),
|
||||||
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
NativeIO.POSIX.POSIX_FADV_DONTNEED);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
Loading…
Reference in New Issue