HDFS-14393. Refactor FsDatasetCache for SCM cache implementation. Contributed by Rakesh R
(cherry picked from commit f3f51284d57ef2e0c7e968b6eea56eab578f7e93)
This commit is contained in:
parent
3f6f095c06
commit
0c6824120a
@ -52,7 +52,6 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -131,104 +130,10 @@ public boolean shouldAdvertise() {
|
|||||||
|
|
||||||
private final long revocationPollingMs;
|
private final long revocationPollingMs;
|
||||||
|
|
||||||
/**
|
|
||||||
* The approximate amount of cache space in use.
|
|
||||||
*
|
|
||||||
* This number is an overestimate, counting bytes that will be used only
|
|
||||||
* if pending caching operations succeed. It does not take into account
|
|
||||||
* pending uncaching operations.
|
|
||||||
*
|
|
||||||
* This overestimate is more useful to the NameNode than an underestimate,
|
|
||||||
* since we don't want the NameNode to assign us more replicas than
|
|
||||||
* we can cache, because of the current batch of operations.
|
|
||||||
*/
|
|
||||||
private final UsedBytesCount usedBytesCount;
|
|
||||||
|
|
||||||
public static class PageRounder {
|
|
||||||
private final long osPageSize =
|
|
||||||
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Round up a number to the operating system page size.
|
|
||||||
*/
|
|
||||||
public long roundUp(long count) {
|
|
||||||
return (count + osPageSize - 1) & (~(osPageSize - 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Round down a number to the operating system page size.
|
|
||||||
*/
|
|
||||||
public long roundDown(long count) {
|
|
||||||
return count & (~(osPageSize - 1));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class UsedBytesCount {
|
|
||||||
private final AtomicLong usedBytes = new AtomicLong(0);
|
|
||||||
|
|
||||||
private final PageRounder rounder = new PageRounder();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Try to reserve more bytes.
|
|
||||||
*
|
|
||||||
* @param count The number of bytes to add. We will round this
|
|
||||||
* up to the page size.
|
|
||||||
*
|
|
||||||
* @return The new number of usedBytes if we succeeded;
|
|
||||||
* -1 if we failed.
|
|
||||||
*/
|
|
||||||
long reserve(long count) {
|
|
||||||
count = rounder.roundUp(count);
|
|
||||||
while (true) {
|
|
||||||
long cur = usedBytes.get();
|
|
||||||
long next = cur + count;
|
|
||||||
if (next > maxBytes) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (usedBytes.compareAndSet(cur, next)) {
|
|
||||||
return next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Release some bytes that we're using.
|
|
||||||
*
|
|
||||||
* @param count The number of bytes to release. We will round this
|
|
||||||
* up to the page size.
|
|
||||||
*
|
|
||||||
* @return The new number of usedBytes.
|
|
||||||
*/
|
|
||||||
long release(long count) {
|
|
||||||
count = rounder.roundUp(count);
|
|
||||||
return usedBytes.addAndGet(-count);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Release some bytes that we're using rounded down to the page size.
|
|
||||||
*
|
|
||||||
* @param count The number of bytes to release. We will round this
|
|
||||||
* down to the page size.
|
|
||||||
*
|
|
||||||
* @return The new number of usedBytes.
|
|
||||||
*/
|
|
||||||
long releaseRoundDown(long count) {
|
|
||||||
count = rounder.roundDown(count);
|
|
||||||
return usedBytes.addAndGet(-count);
|
|
||||||
}
|
|
||||||
|
|
||||||
long get() {
|
|
||||||
return usedBytes.get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The total cache capacity in bytes.
|
|
||||||
*/
|
|
||||||
private final long maxBytes;
|
|
||||||
|
|
||||||
private final MappableBlockLoader mappableBlockLoader;
|
private final MappableBlockLoader mappableBlockLoader;
|
||||||
|
|
||||||
|
private final MemoryCacheStats memCacheStats;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of cache commands that could not be completed successfully
|
* Number of cache commands that could not be completed successfully
|
||||||
*/
|
*/
|
||||||
@ -240,12 +145,10 @@ long get() {
|
|||||||
|
|
||||||
public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
|
public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
|
||||||
this.dataset = dataset;
|
this.dataset = dataset;
|
||||||
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
|
|
||||||
ThreadFactory workerFactory = new ThreadFactoryBuilder()
|
ThreadFactory workerFactory = new ThreadFactoryBuilder()
|
||||||
.setDaemon(true)
|
.setDaemon(true)
|
||||||
.setNameFormat("FsDatasetCache-%d-" + dataset.toString())
|
.setNameFormat("FsDatasetCache-%d-" + dataset.toString())
|
||||||
.build();
|
.build();
|
||||||
this.usedBytesCount = new UsedBytesCount();
|
|
||||||
this.uncachingExecutor = new ThreadPoolExecutor(
|
this.uncachingExecutor = new ThreadPoolExecutor(
|
||||||
0, 1,
|
0, 1,
|
||||||
60, TimeUnit.SECONDS,
|
60, TimeUnit.SECONDS,
|
||||||
@ -270,7 +173,11 @@ public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
|
|||||||
". Reconfigure this to " + minRevocationPollingMs);
|
". Reconfigure this to " + minRevocationPollingMs);
|
||||||
}
|
}
|
||||||
this.revocationPollingMs = confRevocationPollingMs;
|
this.revocationPollingMs = confRevocationPollingMs;
|
||||||
this.mappableBlockLoader = new MemoryMappableBlockLoader();
|
|
||||||
|
this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
|
||||||
|
// Both lazy writer and read cache are sharing this statistics.
|
||||||
|
this.memCacheStats = new MemoryCacheStats(
|
||||||
|
dataset.datanode.getDnConf().getMaxLockedMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -371,7 +278,7 @@ synchronized void uncacheBlock(String bpid, long blockId) {
|
|||||||
* -1 if we failed.
|
* -1 if we failed.
|
||||||
*/
|
*/
|
||||||
long reserve(long count) {
|
long reserve(long count) {
|
||||||
return usedBytesCount.reserve(count);
|
return memCacheStats.reserve(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -383,7 +290,7 @@ long reserve(long count) {
|
|||||||
* @return The new number of usedBytes.
|
* @return The new number of usedBytes.
|
||||||
*/
|
*/
|
||||||
long release(long count) {
|
long release(long count) {
|
||||||
return usedBytesCount.release(count);
|
return memCacheStats.release(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -395,7 +302,7 @@ long release(long count) {
|
|||||||
* @return The new number of usedBytes.
|
* @return The new number of usedBytes.
|
||||||
*/
|
*/
|
||||||
long releaseRoundDown(long count) {
|
long releaseRoundDown(long count) {
|
||||||
return usedBytesCount.releaseRoundDown(count);
|
return memCacheStats.releaseRoundDown(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -404,14 +311,14 @@ long releaseRoundDown(long count) {
|
|||||||
* @return the OS page size.
|
* @return the OS page size.
|
||||||
*/
|
*/
|
||||||
long getOsPageSize() {
|
long getOsPageSize() {
|
||||||
return usedBytesCount.rounder.osPageSize;
|
return memCacheStats.getPageSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Round up to the OS page size.
|
* Round up to the OS page size.
|
||||||
*/
|
*/
|
||||||
long roundUpPageSize(long count) {
|
long roundUpPageSize(long count) {
|
||||||
return usedBytesCount.rounder.roundUp(count);
|
return memCacheStats.roundUpPageSize(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -437,14 +344,14 @@ public void run() {
|
|||||||
MappableBlock mappableBlock = null;
|
MappableBlock mappableBlock = null;
|
||||||
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
|
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
|
||||||
key.getBlockId(), length, genstamp);
|
key.getBlockId(), length, genstamp);
|
||||||
long newUsedBytes = reserve(length);
|
long newUsedBytes = mappableBlockLoader.reserve(length);
|
||||||
boolean reservedBytes = false;
|
boolean reservedBytes = false;
|
||||||
try {
|
try {
|
||||||
if (newUsedBytes < 0) {
|
if (newUsedBytes < 0) {
|
||||||
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
|
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
|
||||||
" more bytes in the cache: " +
|
" more bytes in the cache: " +
|
||||||
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
|
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
|
||||||
" of " + maxBytes + " exceeded.");
|
" of " + memCacheStats.getCacheCapacity() + " exceeded.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
reservedBytes = true;
|
reservedBytes = true;
|
||||||
@ -497,10 +404,10 @@ public void run() {
|
|||||||
IOUtils.closeQuietly(metaIn);
|
IOUtils.closeQuietly(metaIn);
|
||||||
if (!success) {
|
if (!success) {
|
||||||
if (reservedBytes) {
|
if (reservedBytes) {
|
||||||
release(length);
|
mappableBlockLoader.release(length);
|
||||||
}
|
}
|
||||||
LOG.debug("Caching of {} was aborted. We are now caching only {} "
|
LOG.debug("Caching of {} was aborted. We are now caching only {} "
|
||||||
+ "bytes in total.", key, usedBytesCount.get());
|
+ "bytes in total.", key, memCacheStats.getCacheUsed());
|
||||||
IOUtils.closeQuietly(mappableBlock);
|
IOUtils.closeQuietly(mappableBlock);
|
||||||
numBlocksFailedToCache.incrementAndGet();
|
numBlocksFailedToCache.incrementAndGet();
|
||||||
|
|
||||||
@ -574,7 +481,8 @@ public void run() {
|
|||||||
synchronized (FsDatasetCache.this) {
|
synchronized (FsDatasetCache.this) {
|
||||||
mappableBlockMap.remove(key);
|
mappableBlockMap.remove(key);
|
||||||
}
|
}
|
||||||
long newUsedBytes = release(value.mappableBlock.getLength());
|
long newUsedBytes = mappableBlockLoader
|
||||||
|
.release(value.mappableBlock.getLength());
|
||||||
numBlocksCached.addAndGet(-1);
|
numBlocksCached.addAndGet(-1);
|
||||||
dataset.datanode.getMetrics().incrBlocksUncached(1);
|
dataset.datanode.getMetrics().incrBlocksUncached(1);
|
||||||
if (revocationTimeMs != 0) {
|
if (revocationTimeMs != 0) {
|
||||||
@ -593,14 +501,14 @@ public void run() {
|
|||||||
* Get the approximate amount of cache space used.
|
* Get the approximate amount of cache space used.
|
||||||
*/
|
*/
|
||||||
public long getCacheUsed() {
|
public long getCacheUsed() {
|
||||||
return usedBytesCount.get();
|
return memCacheStats.getCacheUsed();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the maximum amount of bytes we can cache. This is a constant.
|
* Get the maximum amount of bytes we can cache. This is a constant.
|
||||||
*/
|
*/
|
||||||
public long getCacheCapacity() {
|
public long getCacheCapacity() {
|
||||||
return maxBytes;
|
return memCacheStats.getCacheCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getNumBlocksFailedToCache() {
|
public long getNumBlocksFailedToCache() {
|
||||||
|
@ -57,6 +57,26 @@ abstract MappableBlock load(long length, FileInputStream blockIn,
|
|||||||
ExtendedBlockId key)
|
ExtendedBlockId key)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to reserve some given bytes.
|
||||||
|
*
|
||||||
|
* @param bytesCount
|
||||||
|
* The number of bytes to add.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes if we succeeded; -1 if we failed.
|
||||||
|
*/
|
||||||
|
abstract long reserve(long bytesCount);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release some bytes that we're using.
|
||||||
|
*
|
||||||
|
* @param bytesCount
|
||||||
|
* The number of bytes to release.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes.
|
||||||
|
*/
|
||||||
|
abstract long release(long bytesCount);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads bytes into a buffer until EOF or the buffer's limit is reached.
|
* Reads bytes into a buffer until EOF or the buffer's limit is reached.
|
||||||
*/
|
*/
|
||||||
|
@ -0,0 +1,212 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keeps statistics for the memory cache.
|
||||||
|
*/
|
||||||
|
class MemoryCacheStats {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The approximate amount of cache space in use.
|
||||||
|
*
|
||||||
|
* This number is an overestimate, counting bytes that will be used only if
|
||||||
|
* pending caching operations succeed. It does not take into account pending
|
||||||
|
* uncaching operations.
|
||||||
|
*
|
||||||
|
* This overestimate is more useful to the NameNode than an underestimate,
|
||||||
|
* since we don't want the NameNode to assign us more replicas than we can
|
||||||
|
* cache, because of the current batch of operations.
|
||||||
|
*/
|
||||||
|
private final UsedBytesCount usedBytesCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The total cache capacity in bytes.
|
||||||
|
*/
|
||||||
|
private final long maxBytes;
|
||||||
|
|
||||||
|
MemoryCacheStats(long maxBytes) {
|
||||||
|
this.usedBytesCount = new UsedBytesCount();
|
||||||
|
this.maxBytes = maxBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to count operating system page size.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static class PageRounder {
|
||||||
|
private final long osPageSize = NativeIO.POSIX.getCacheManipulator()
|
||||||
|
.getOperatingSystemPageSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Round up a number to the operating system page size.
|
||||||
|
*/
|
||||||
|
public long roundUp(long count) {
|
||||||
|
return (count + osPageSize - 1) & (~(osPageSize - 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Round down a number to the operating system page size.
|
||||||
|
*/
|
||||||
|
public long roundDown(long count) {
|
||||||
|
return count & (~(osPageSize - 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Counts used bytes for memory.
|
||||||
|
*/
|
||||||
|
private class UsedBytesCount {
|
||||||
|
private final AtomicLong usedBytes = new AtomicLong(0);
|
||||||
|
|
||||||
|
private MemoryCacheStats.PageRounder rounder = new PageRounder();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to reserve more bytes.
|
||||||
|
*
|
||||||
|
* @param count
|
||||||
|
* The number of bytes to add. We will round this up to the page
|
||||||
|
* size.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes if we succeeded; -1 if we failed.
|
||||||
|
*/
|
||||||
|
long reserve(long count) {
|
||||||
|
count = rounder.roundUp(count);
|
||||||
|
while (true) {
|
||||||
|
long cur = usedBytes.get();
|
||||||
|
long next = cur + count;
|
||||||
|
if (next > getCacheCapacity()) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (usedBytes.compareAndSet(cur, next)) {
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release some bytes that we're using.
|
||||||
|
*
|
||||||
|
* @param count
|
||||||
|
* The number of bytes to release. We will round this up to the
|
||||||
|
* page size.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes.
|
||||||
|
*/
|
||||||
|
long release(long count) {
|
||||||
|
count = rounder.roundUp(count);
|
||||||
|
return usedBytes.addAndGet(-count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release some bytes that we're using rounded down to the page size.
|
||||||
|
*
|
||||||
|
* @param count
|
||||||
|
* The number of bytes to release. We will round this down to the
|
||||||
|
* page size.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes.
|
||||||
|
*/
|
||||||
|
long releaseRoundDown(long count) {
|
||||||
|
count = rounder.roundDown(count);
|
||||||
|
return usedBytes.addAndGet(-count);
|
||||||
|
}
|
||||||
|
|
||||||
|
long get() {
|
||||||
|
return usedBytes.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stats related methods for FSDatasetMBean
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the approximate amount of cache space used.
|
||||||
|
*/
|
||||||
|
public long getCacheUsed() {
|
||||||
|
return usedBytesCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the maximum amount of bytes we can cache. This is a constant.
|
||||||
|
*/
|
||||||
|
public long getCacheCapacity() {
|
||||||
|
return maxBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to reserve more bytes.
|
||||||
|
*
|
||||||
|
* @param count
|
||||||
|
* The number of bytes to add. We will round this up to the page
|
||||||
|
* size.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes if we succeeded; -1 if we failed.
|
||||||
|
*/
|
||||||
|
long reserve(long count) {
|
||||||
|
return usedBytesCount.reserve(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release some bytes that we're using.
|
||||||
|
*
|
||||||
|
* @param count
|
||||||
|
* The number of bytes to release. We will round this up to the
|
||||||
|
* page size.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes.
|
||||||
|
*/
|
||||||
|
long release(long count) {
|
||||||
|
return usedBytesCount.release(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release some bytes that we're using rounded down to the page size.
|
||||||
|
*
|
||||||
|
* @param count
|
||||||
|
* The number of bytes to release. We will round this down to the
|
||||||
|
* page size.
|
||||||
|
*
|
||||||
|
* @return The new number of usedBytes.
|
||||||
|
*/
|
||||||
|
long releaseRoundDown(long count) {
|
||||||
|
return usedBytesCount.releaseRoundDown(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the OS page size.
|
||||||
|
*
|
||||||
|
* @return the OS page size.
|
||||||
|
*/
|
||||||
|
long getPageSize() {
|
||||||
|
return usedBytesCount.rounder.osPageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Round up to the OS page size.
|
||||||
|
*/
|
||||||
|
long roundUpPageSize(long count) {
|
||||||
|
return usedBytesCount.rounder.roundUp(count);
|
||||||
|
}
|
||||||
|
}
|
@ -42,6 +42,18 @@
|
|||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class MemoryMappableBlockLoader extends MappableBlockLoader {
|
public class MemoryMappableBlockLoader extends MappableBlockLoader {
|
||||||
|
|
||||||
|
private final FsDatasetCache cacheManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs memory mappable loader.
|
||||||
|
*
|
||||||
|
* @param cacheManager
|
||||||
|
* FsDatasetCache reference.
|
||||||
|
*/
|
||||||
|
MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
|
||||||
|
this.cacheManager = cacheManager;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the block.
|
* Load the block.
|
||||||
*
|
*
|
||||||
@ -90,7 +102,7 @@ public MappableBlock load(long length, FileInputStream blockIn,
|
|||||||
/**
|
/**
|
||||||
* Verifies the block's checksum. This is an I/O intensive operation.
|
* Verifies the block's checksum. This is an I/O intensive operation.
|
||||||
*/
|
*/
|
||||||
public void verifyChecksum(long length, FileInputStream metaIn,
|
private void verifyChecksum(long length, FileInputStream metaIn,
|
||||||
FileChannel blockChannel, String blockFileName)
|
FileChannel blockChannel, String blockFileName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Verify the checksum from the block's meta file
|
// Verify the checksum from the block's meta file
|
||||||
@ -139,4 +151,14 @@ public void verifyChecksum(long length, FileInputStream metaIn,
|
|||||||
IOUtils.closeQuietly(metaChannel);
|
IOUtils.closeQuietly(metaChannel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
long reserve(long bytesCount) {
|
||||||
|
return cacheManager.reserve(bytesCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
long release(long bytesCount) {
|
||||||
|
return cacheManager.release(bytesCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestFsDatasetCache;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
|
||||||
@ -50,6 +51,9 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests FsDatasetCache behaviors.
|
||||||
|
*/
|
||||||
public class TestFsDatasetCacheRevocation {
|
public class TestFsDatasetCacheRevocation {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
TestFsDatasetCacheRevocation.class);
|
TestFsDatasetCacheRevocation.class);
|
||||||
@ -112,19 +116,18 @@ public void testPinning() throws Exception {
|
|||||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
|
||||||
// Create and cache a file.
|
// Create and cache a file.
|
||||||
final String TEST_FILE = "/test_file";
|
final String testFile = "/test_file";
|
||||||
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
|
DFSTestUtil.createFile(dfs, new Path(testFile),
|
||||||
BLOCK_SIZE, (short)1, 0xcafe);
|
BLOCK_SIZE, (short)1, 0xcafe);
|
||||||
dfs.addCachePool(new CachePoolInfo("pool"));
|
dfs.addCachePool(new CachePoolInfo("pool"));
|
||||||
long cacheDirectiveId =
|
long cacheDirectiveId = dfs
|
||||||
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
|
.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
|
||||||
setPool("pool").setPath(new Path(TEST_FILE)).
|
.setPath(new Path(testFile)).setReplication((short) 1).build());
|
||||||
setReplication((short) 1).build());
|
|
||||||
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
|
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
|
||||||
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
|
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
|
||||||
|
|
||||||
// Mmap the file.
|
// Mmap the file.
|
||||||
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
|
FSDataInputStream in = dfs.open(new Path(testFile));
|
||||||
ByteBuffer buf =
|
ByteBuffer buf =
|
||||||
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
|
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
|
||||||
|
|
||||||
@ -143,8 +146,8 @@ public void testPinning() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that when we have an uncache request, and the client refuses to release
|
* Test that when we have an uncache request, and the client refuses to
|
||||||
* the replica for a long time, we will un-mlock it.
|
* release the replica for a long time, we will un-mlock it.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testRevocation() throws Exception {
|
public void testRevocation() throws Exception {
|
||||||
@ -163,19 +166,19 @@ public void testRevocation() throws Exception {
|
|||||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
|
||||||
// Create and cache a file.
|
// Create and cache a file.
|
||||||
final String TEST_FILE = "/test_file2";
|
final String testFile = "/test_file2";
|
||||||
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
|
DFSTestUtil.createFile(dfs, new Path(testFile),
|
||||||
BLOCK_SIZE, (short)1, 0xcafe);
|
BLOCK_SIZE, (short)1, 0xcafe);
|
||||||
dfs.addCachePool(new CachePoolInfo("pool"));
|
dfs.addCachePool(new CachePoolInfo("pool"));
|
||||||
long cacheDirectiveId =
|
long cacheDirectiveId =
|
||||||
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
|
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
|
||||||
setPool("pool").setPath(new Path(TEST_FILE)).
|
setPool("pool").setPath(new Path(testFile)).
|
||||||
setReplication((short) 1).build());
|
setReplication((short) 1).build());
|
||||||
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
|
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
|
||||||
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
|
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
|
||||||
|
|
||||||
// Mmap the file.
|
// Mmap the file.
|
||||||
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
|
FSDataInputStream in = dfs.open(new Path(testFile));
|
||||||
ByteBuffer buf =
|
ByteBuffer buf =
|
||||||
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
|
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
import net.jcip.annotations.NotThreadSafe;
|
import net.jcip.annotations.NotThreadSafe;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
@ -59,9 +59,11 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
||||||
@ -100,7 +102,7 @@ public class TestFsDatasetCache {
|
|||||||
LoggerFactory.getLogger(TestFsDatasetCache.class);
|
LoggerFactory.getLogger(TestFsDatasetCache.class);
|
||||||
|
|
||||||
// Most Linux installs allow a default of 64KB locked memory
|
// Most Linux installs allow a default of 64KB locked memory
|
||||||
static final long CACHE_CAPACITY = 64 * 1024;
|
public static final long CACHE_CAPACITY = 64 * 1024;
|
||||||
// mlock always locks the entire page. So we don't need to deal with this
|
// mlock always locks the entire page. So we don't need to deal with this
|
||||||
// rounding, use the OS page size for the block size.
|
// rounding, use the OS page size for the block size.
|
||||||
private static final long PAGE_SIZE =
|
private static final long PAGE_SIZE =
|
Loading…
x
Reference in New Issue
Block a user