HDFS-14393. Refactor FsDatasetCache for SCM cache implementation. Contributed by Rakesh R

(cherry picked from commit f3f51284d5)
This commit is contained in:
Rakesh Radhakrishnan 2019-03-29 00:18:15 +05:30
parent aab9fefddc
commit 712749c1a0
6 changed files with 298 additions and 131 deletions

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -131,104 +130,10 @@ public class FsDatasetCache {
private final long revocationPollingMs;
/**
* The approximate amount of cache space in use.
*
* This number is an overestimate, counting bytes that will be used only
* if pending caching operations succeed. It does not take into account
* pending uncaching operations.
*
* This overestimate is more useful to the NameNode than an underestimate,
* since we don't want the NameNode to assign us more replicas than
* we can cache, because of the current batch of operations.
*/
private final UsedBytesCount usedBytesCount;
public static class PageRounder {
private final long osPageSize =
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
/**
* Round up a number to the operating system page size.
*/
public long roundUp(long count) {
return (count + osPageSize - 1) & (~(osPageSize - 1));
}
/**
* Round down a number to the operating system page size.
*/
public long roundDown(long count) {
return count & (~(osPageSize - 1));
}
}
private class UsedBytesCount {
private final AtomicLong usedBytes = new AtomicLong(0);
private final PageRounder rounder = new PageRounder();
/**
* Try to reserve more bytes.
*
* @param count The number of bytes to add. We will round this
* up to the page size.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
long reserve(long count) {
count = rounder.roundUp(count);
while (true) {
long cur = usedBytes.get();
long next = cur + count;
if (next > maxBytes) {
return -1;
}
if (usedBytes.compareAndSet(cur, next)) {
return next;
}
}
}
/**
* Release some bytes that we're using.
*
* @param count The number of bytes to release. We will round this
* up to the page size.
*
* @return The new number of usedBytes.
*/
long release(long count) {
count = rounder.roundUp(count);
return usedBytes.addAndGet(-count);
}
/**
* Release some bytes that we're using rounded down to the page size.
*
* @param count The number of bytes to release. We will round this
* down to the page size.
*
* @return The new number of usedBytes.
*/
long releaseRoundDown(long count) {
count = rounder.roundDown(count);
return usedBytes.addAndGet(-count);
}
long get() {
return usedBytes.get();
}
}
/**
* The total cache capacity in bytes.
*/
private final long maxBytes;
private final MappableBlockLoader mappableBlockLoader;
private final MemoryCacheStats memCacheStats;
/**
* Number of cache commands that could not be completed successfully
*/
@ -240,12 +145,10 @@ public class FsDatasetCache {
public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
this.dataset = dataset;
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FsDatasetCache-%d-" + dataset.toString())
.build();
this.usedBytesCount = new UsedBytesCount();
this.uncachingExecutor = new ThreadPoolExecutor(
0, 1,
60, TimeUnit.SECONDS,
@ -270,7 +173,11 @@ public class FsDatasetCache {
". Reconfigure this to " + minRevocationPollingMs);
}
this.revocationPollingMs = confRevocationPollingMs;
this.mappableBlockLoader = new MemoryMappableBlockLoader();
this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
// Both lazy writer and read cache are sharing this statistics.
this.memCacheStats = new MemoryCacheStats(
dataset.datanode.getDnConf().getMaxLockedMemory());
}
/**
@ -371,7 +278,7 @@ public class FsDatasetCache {
* -1 if we failed.
*/
long reserve(long count) {
return usedBytesCount.reserve(count);
return memCacheStats.reserve(count);
}
/**
@ -383,7 +290,7 @@ public class FsDatasetCache {
* @return The new number of usedBytes.
*/
long release(long count) {
return usedBytesCount.release(count);
return memCacheStats.release(count);
}
/**
@ -395,7 +302,7 @@ public class FsDatasetCache {
* @return The new number of usedBytes.
*/
long releaseRoundDown(long count) {
return usedBytesCount.releaseRoundDown(count);
return memCacheStats.releaseRoundDown(count);
}
/**
@ -404,14 +311,14 @@ public class FsDatasetCache {
* @return the OS page size.
*/
long getOsPageSize() {
return usedBytesCount.rounder.osPageSize;
return memCacheStats.getPageSize();
}
/**
* Round up to the OS page size.
*/
long roundUpPageSize(long count) {
return usedBytesCount.rounder.roundUp(count);
return memCacheStats.roundUpPageSize(count);
}
/**
@ -437,14 +344,14 @@ public class FsDatasetCache {
MappableBlock mappableBlock = null;
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
long newUsedBytes = reserve(length);
long newUsedBytes = mappableBlockLoader.reserve(length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
" more bytes in the cache: " +
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
" of " + maxBytes + " exceeded.");
" of " + memCacheStats.getCacheCapacity() + " exceeded.");
return;
}
reservedBytes = true;
@ -497,10 +404,10 @@ public class FsDatasetCache {
IOUtils.closeQuietly(metaIn);
if (!success) {
if (reservedBytes) {
release(length);
mappableBlockLoader.release(length);
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, usedBytesCount.get());
+ "bytes in total.", key, memCacheStats.getCacheUsed());
IOUtils.closeQuietly(mappableBlock);
numBlocksFailedToCache.incrementAndGet();
@ -574,7 +481,8 @@ public class FsDatasetCache {
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
long newUsedBytes = release(value.mappableBlock.getLength());
long newUsedBytes = mappableBlockLoader
.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) {
@ -593,14 +501,14 @@ public class FsDatasetCache {
* Get the approximate amount of cache space used.
*/
public long getCacheUsed() {
return usedBytesCount.get();
return memCacheStats.getCacheUsed();
}
/**
* Get the maximum amount of bytes we can cache. This is a constant.
*/
public long getCacheCapacity() {
return maxBytes;
return memCacheStats.getCacheCapacity();
}
public long getNumBlocksFailedToCache() {

View File

@ -57,6 +57,26 @@ public abstract class MappableBlockLoader {
ExtendedBlockId key)
throws IOException;
/**
* Try to reserve some given bytes.
*
* @param bytesCount
* The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded; -1 if we failed.
*/
abstract long reserve(long bytesCount);
/**
* Release some bytes that we're using.
*
* @param bytesCount
* The number of bytes to release.
*
* @return The new number of usedBytes.
*/
abstract long release(long bytesCount);
/**
* Reads bytes into a buffer until EOF or the buffer's limit is reached.
*/

View File

@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.io.nativeio.NativeIO;
import com.google.common.annotations.VisibleForTesting;
/**
* Keeps statistics for the memory cache.
*/
class MemoryCacheStats {
/**
* The approximate amount of cache space in use.
*
* This number is an overestimate, counting bytes that will be used only if
* pending caching operations succeed. It does not take into account pending
* uncaching operations.
*
* This overestimate is more useful to the NameNode than an underestimate,
* since we don't want the NameNode to assign us more replicas than we can
* cache, because of the current batch of operations.
*/
private final UsedBytesCount usedBytesCount;
/**
* The total cache capacity in bytes.
*/
private final long maxBytes;
MemoryCacheStats(long maxBytes) {
this.usedBytesCount = new UsedBytesCount();
this.maxBytes = maxBytes;
}
/**
* Used to count operating system page size.
*/
@VisibleForTesting
static class PageRounder {
private final long osPageSize = NativeIO.POSIX.getCacheManipulator()
.getOperatingSystemPageSize();
/**
* Round up a number to the operating system page size.
*/
public long roundUp(long count) {
return (count + osPageSize - 1) & (~(osPageSize - 1));
}
/**
* Round down a number to the operating system page size.
*/
public long roundDown(long count) {
return count & (~(osPageSize - 1));
}
}
/**
* Counts used bytes for memory.
*/
private class UsedBytesCount {
private final AtomicLong usedBytes = new AtomicLong(0);
private MemoryCacheStats.PageRounder rounder = new PageRounder();
/**
* Try to reserve more bytes.
*
* @param count
* The number of bytes to add. We will round this up to the page
* size.
*
* @return The new number of usedBytes if we succeeded; -1 if we failed.
*/
long reserve(long count) {
count = rounder.roundUp(count);
while (true) {
long cur = usedBytes.get();
long next = cur + count;
if (next > getCacheCapacity()) {
return -1;
}
if (usedBytes.compareAndSet(cur, next)) {
return next;
}
}
}
/**
* Release some bytes that we're using.
*
* @param count
* The number of bytes to release. We will round this up to the
* page size.
*
* @return The new number of usedBytes.
*/
long release(long count) {
count = rounder.roundUp(count);
return usedBytes.addAndGet(-count);
}
/**
* Release some bytes that we're using rounded down to the page size.
*
* @param count
* The number of bytes to release. We will round this down to the
* page size.
*
* @return The new number of usedBytes.
*/
long releaseRoundDown(long count) {
count = rounder.roundDown(count);
return usedBytes.addAndGet(-count);
}
long get() {
return usedBytes.get();
}
}
// Stats related methods for FSDatasetMBean
/**
* Get the approximate amount of cache space used.
*/
public long getCacheUsed() {
return usedBytesCount.get();
}
/**
* Get the maximum amount of bytes we can cache. This is a constant.
*/
public long getCacheCapacity() {
return maxBytes;
}
/**
* Try to reserve more bytes.
*
* @param count
* The number of bytes to add. We will round this up to the page
* size.
*
* @return The new number of usedBytes if we succeeded; -1 if we failed.
*/
long reserve(long count) {
return usedBytesCount.reserve(count);
}
/**
* Release some bytes that we're using.
*
* @param count
* The number of bytes to release. We will round this up to the
* page size.
*
* @return The new number of usedBytes.
*/
long release(long count) {
return usedBytesCount.release(count);
}
/**
* Release some bytes that we're using rounded down to the page size.
*
* @param count
* The number of bytes to release. We will round this down to the
* page size.
*
* @return The new number of usedBytes.
*/
long releaseRoundDown(long count) {
return usedBytesCount.releaseRoundDown(count);
}
/**
* Get the OS page size.
*
* @return the OS page size.
*/
long getPageSize() {
return usedBytesCount.rounder.osPageSize;
}
/**
* Round up to the OS page size.
*/
long roundUpPageSize(long count) {
return usedBytesCount.rounder.roundUp(count);
}
}

View File

@ -42,6 +42,18 @@ import java.nio.channels.FileChannel;
@InterfaceStability.Unstable
public class MemoryMappableBlockLoader extends MappableBlockLoader {
private final FsDatasetCache cacheManager;
/**
* Constructs memory mappable loader.
*
* @param cacheManager
* FsDatasetCache reference.
*/
MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
this.cacheManager = cacheManager;
}
/**
* Load the block.
*
@ -90,7 +102,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
/**
* Verifies the block's checksum. This is an I/O intensive operation.
*/
public void verifyChecksum(long length, FileInputStream metaIn,
private void verifyChecksum(long length, FileInputStream metaIn,
FileChannel blockChannel, String blockFileName)
throws IOException {
// Verify the checksum from the block's meta file
@ -139,4 +151,14 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
IOUtils.closeQuietly(metaChannel);
}
}
@Override
long reserve(long bytesCount) {
return cacheManager.reserve(bytesCount);
}
@Override
long release(long bytesCount) {
return cacheManager.release(bytesCount);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestFsDatasetCache;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
@ -50,6 +51,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests FsDatasetCache behaviors.
*/
public class TestFsDatasetCacheRevocation {
private static final Logger LOG = LoggerFactory.getLogger(
TestFsDatasetCacheRevocation.class);
@ -112,19 +116,18 @@ public class TestFsDatasetCacheRevocation {
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
final String testFile = "/test_file";
DFSTestUtil.createFile(dfs, new Path(testFile),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
long cacheDirectiveId = dfs
.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
.setPath(new Path(testFile)).setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
FSDataInputStream in = dfs.open(new Path(testFile));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
@ -143,8 +146,8 @@ public class TestFsDatasetCacheRevocation {
}
/**
* Test that when we have an uncache request, and the client refuses to release
* the replica for a long time, we will un-mlock it.
* Test that when we have an uncache request, and the client refuses to
* release the replica for a long time, we will un-mlock it.
*/
@Test(timeout=120000)
public void testRevocation() throws Exception {
@ -163,19 +166,19 @@ public class TestFsDatasetCacheRevocation {
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file2";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
final String testFile = "/test_file2";
DFSTestUtil.createFile(dfs, new Path(testFile),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setPool("pool").setPath(new Path(testFile)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
FSDataInputStream in = dfs.open(new Path(testFile));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@ -60,9 +60,11 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@ -101,7 +103,7 @@ public class TestFsDatasetCache {
private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
// Most Linux installs allow a default of 64KB locked memory
static final long CACHE_CAPACITY = 64 * 1024;
public static final long CACHE_CAPACITY = 64 * 1024;
// mlock always locks the entire page. So we don't need to deal with this
// rounding, use the OS page size for the block size.
private static final long PAGE_SIZE =