diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index b7795f67d46..f2abadaeb2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,104 +130,10 @@ public class FsDatasetCache { private final long revocationPollingMs; - /** - * The approximate amount of cache space in use. - * - * This number is an overestimate, counting bytes that will be used only - * if pending caching operations succeed. It does not take into account - * pending uncaching operations. - * - * This overestimate is more useful to the NameNode than an underestimate, - * since we don't want the NameNode to assign us more replicas than - * we can cache, because of the current batch of operations. - */ - private final UsedBytesCount usedBytesCount; - - public static class PageRounder { - private final long osPageSize = - NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); - - /** - * Round up a number to the operating system page size. - */ - public long roundUp(long count) { - return (count + osPageSize - 1) & (~(osPageSize - 1)); - } - - /** - * Round down a number to the operating system page size. - */ - public long roundDown(long count) { - return count & (~(osPageSize - 1)); - } - } - - private class UsedBytesCount { - private final AtomicLong usedBytes = new AtomicLong(0); - - private final PageRounder rounder = new PageRounder(); - - /** - * Try to reserve more bytes. - * - * @param count The number of bytes to add. We will round this - * up to the page size. - * - * @return The new number of usedBytes if we succeeded; - * -1 if we failed. - */ - long reserve(long count) { - count = rounder.roundUp(count); - while (true) { - long cur = usedBytes.get(); - long next = cur + count; - if (next > maxBytes) { - return -1; - } - if (usedBytes.compareAndSet(cur, next)) { - return next; - } - } - } - - /** - * Release some bytes that we're using. - * - * @param count The number of bytes to release. We will round this - * up to the page size. - * - * @return The new number of usedBytes. - */ - long release(long count) { - count = rounder.roundUp(count); - return usedBytes.addAndGet(-count); - } - - /** - * Release some bytes that we're using rounded down to the page size. - * - * @param count The number of bytes to release. We will round this - * down to the page size. - * - * @return The new number of usedBytes. - */ - long releaseRoundDown(long count) { - count = rounder.roundDown(count); - return usedBytes.addAndGet(-count); - } - - long get() { - return usedBytes.get(); - } - } - - /** - * The total cache capacity in bytes. - */ - private final long maxBytes; - private final MappableBlockLoader mappableBlockLoader; + private final MemoryCacheStats memCacheStats; + /** * Number of cache commands that could not be completed successfully */ @@ -240,12 +145,10 @@ public class FsDatasetCache { public FsDatasetCache(FsDatasetImpl dataset) throws IOException { this.dataset = dataset; - this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); ThreadFactory workerFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("FsDatasetCache-%d-" + dataset.toString()) .build(); - this.usedBytesCount = new UsedBytesCount(); this.uncachingExecutor = new ThreadPoolExecutor( 0, 1, 60, TimeUnit.SECONDS, @@ -270,7 +173,11 @@ public class FsDatasetCache { ". Reconfigure this to " + minRevocationPollingMs); } this.revocationPollingMs = confRevocationPollingMs; - this.mappableBlockLoader = new MemoryMappableBlockLoader(); + + this.mappableBlockLoader = new MemoryMappableBlockLoader(this); + // Both lazy writer and read cache are sharing this statistics. + this.memCacheStats = new MemoryCacheStats( + dataset.datanode.getDnConf().getMaxLockedMemory()); } /** @@ -371,7 +278,7 @@ public class FsDatasetCache { * -1 if we failed. */ long reserve(long count) { - return usedBytesCount.reserve(count); + return memCacheStats.reserve(count); } /** @@ -383,7 +290,7 @@ public class FsDatasetCache { * @return The new number of usedBytes. */ long release(long count) { - return usedBytesCount.release(count); + return memCacheStats.release(count); } /** @@ -395,7 +302,7 @@ public class FsDatasetCache { * @return The new number of usedBytes. */ long releaseRoundDown(long count) { - return usedBytesCount.releaseRoundDown(count); + return memCacheStats.releaseRoundDown(count); } /** @@ -404,14 +311,14 @@ public class FsDatasetCache { * @return the OS page size. */ long getOsPageSize() { - return usedBytesCount.rounder.osPageSize; + return memCacheStats.getPageSize(); } /** * Round up to the OS page size. */ long roundUpPageSize(long count) { - return usedBytesCount.rounder.roundUp(count); + return memCacheStats.roundUpPageSize(count); } /** @@ -437,14 +344,14 @@ public class FsDatasetCache { MappableBlock mappableBlock = null; ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); - long newUsedBytes = reserve(length); + long newUsedBytes = mappableBlockLoader.reserve(length); boolean reservedBytes = false; try { if (newUsedBytes < 0) { LOG.warn("Failed to cache " + key + ": could not reserve " + length + " more bytes in the cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + - " of " + maxBytes + " exceeded."); + " of " + memCacheStats.getCacheCapacity() + " exceeded."); return; } reservedBytes = true; @@ -497,10 +404,10 @@ public class FsDatasetCache { IOUtils.closeQuietly(metaIn); if (!success) { if (reservedBytes) { - release(length); + mappableBlockLoader.release(length); } LOG.debug("Caching of {} was aborted. We are now caching only {} " - + "bytes in total.", key, usedBytesCount.get()); + + "bytes in total.", key, memCacheStats.getCacheUsed()); IOUtils.closeQuietly(mappableBlock); numBlocksFailedToCache.incrementAndGet(); @@ -574,7 +481,8 @@ public class FsDatasetCache { synchronized (FsDatasetCache.this) { mappableBlockMap.remove(key); } - long newUsedBytes = release(value.mappableBlock.getLength()); + long newUsedBytes = mappableBlockLoader + .release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); dataset.datanode.getMetrics().incrBlocksUncached(1); if (revocationTimeMs != 0) { @@ -593,14 +501,14 @@ public class FsDatasetCache { * Get the approximate amount of cache space used. */ public long getCacheUsed() { - return usedBytesCount.get(); + return memCacheStats.getCacheUsed(); } /** * Get the maximum amount of bytes we can cache. This is a constant. */ public long getCacheCapacity() { - return maxBytes; + return memCacheStats.getCacheCapacity(); } public long getNumBlocksFailedToCache() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java index a323f78394e..0f5ec2d7022 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java @@ -57,6 +57,26 @@ public abstract class MappableBlockLoader { ExtendedBlockId key) throws IOException; + /** + * Try to reserve some given bytes. + * + * @param bytesCount + * The number of bytes to add. + * + * @return The new number of usedBytes if we succeeded; -1 if we failed. + */ + abstract long reserve(long bytesCount); + + /** + * Release some bytes that we're using. + * + * @param bytesCount + * The number of bytes to release. + * + * @return The new number of usedBytes. + */ + abstract long release(long bytesCount); + /** * Reads bytes into a buffer until EOF or the buffer's limit is reached. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java new file mode 100644 index 00000000000..d276c275259 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.io.nativeio.NativeIO; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Keeps statistics for the memory cache. + */ +class MemoryCacheStats { + + /** + * The approximate amount of cache space in use. + * + * This number is an overestimate, counting bytes that will be used only if + * pending caching operations succeed. It does not take into account pending + * uncaching operations. + * + * This overestimate is more useful to the NameNode than an underestimate, + * since we don't want the NameNode to assign us more replicas than we can + * cache, because of the current batch of operations. + */ + private final UsedBytesCount usedBytesCount; + + /** + * The total cache capacity in bytes. + */ + private final long maxBytes; + + MemoryCacheStats(long maxBytes) { + this.usedBytesCount = new UsedBytesCount(); + this.maxBytes = maxBytes; + } + + /** + * Used to count operating system page size. + */ + @VisibleForTesting + static class PageRounder { + private final long osPageSize = NativeIO.POSIX.getCacheManipulator() + .getOperatingSystemPageSize(); + + /** + * Round up a number to the operating system page size. + */ + public long roundUp(long count) { + return (count + osPageSize - 1) & (~(osPageSize - 1)); + } + + /** + * Round down a number to the operating system page size. + */ + public long roundDown(long count) { + return count & (~(osPageSize - 1)); + } + } + + /** + * Counts used bytes for memory. + */ + private class UsedBytesCount { + private final AtomicLong usedBytes = new AtomicLong(0); + + private MemoryCacheStats.PageRounder rounder = new PageRounder(); + + /** + * Try to reserve more bytes. + * + * @param count + * The number of bytes to add. We will round this up to the page + * size. + * + * @return The new number of usedBytes if we succeeded; -1 if we failed. + */ + long reserve(long count) { + count = rounder.roundUp(count); + while (true) { + long cur = usedBytes.get(); + long next = cur + count; + if (next > getCacheCapacity()) { + return -1; + } + if (usedBytes.compareAndSet(cur, next)) { + return next; + } + } + } + + /** + * Release some bytes that we're using. + * + * @param count + * The number of bytes to release. We will round this up to the + * page size. + * + * @return The new number of usedBytes. + */ + long release(long count) { + count = rounder.roundUp(count); + return usedBytes.addAndGet(-count); + } + + /** + * Release some bytes that we're using rounded down to the page size. + * + * @param count + * The number of bytes to release. We will round this down to the + * page size. + * + * @return The new number of usedBytes. + */ + long releaseRoundDown(long count) { + count = rounder.roundDown(count); + return usedBytes.addAndGet(-count); + } + + long get() { + return usedBytes.get(); + } + } + + // Stats related methods for FSDatasetMBean + + /** + * Get the approximate amount of cache space used. + */ + public long getCacheUsed() { + return usedBytesCount.get(); + } + + /** + * Get the maximum amount of bytes we can cache. This is a constant. + */ + public long getCacheCapacity() { + return maxBytes; + } + + /** + * Try to reserve more bytes. + * + * @param count + * The number of bytes to add. We will round this up to the page + * size. + * + * @return The new number of usedBytes if we succeeded; -1 if we failed. + */ + long reserve(long count) { + return usedBytesCount.reserve(count); + } + + /** + * Release some bytes that we're using. + * + * @param count + * The number of bytes to release. We will round this up to the + * page size. + * + * @return The new number of usedBytes. + */ + long release(long count) { + return usedBytesCount.release(count); + } + + /** + * Release some bytes that we're using rounded down to the page size. + * + * @param count + * The number of bytes to release. We will round this down to the + * page size. + * + * @return The new number of usedBytes. + */ + long releaseRoundDown(long count) { + return usedBytesCount.releaseRoundDown(count); + } + + /** + * Get the OS page size. + * + * @return the OS page size. + */ + long getPageSize() { + return usedBytesCount.rounder.osPageSize; + } + + /** + * Round up to the OS page size. + */ + long roundUpPageSize(long count) { + return usedBytesCount.rounder.roundUp(count); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java index 7a0f7c75660..d93193efc1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java @@ -42,6 +42,18 @@ import java.nio.channels.FileChannel; @InterfaceStability.Unstable public class MemoryMappableBlockLoader extends MappableBlockLoader { + private final FsDatasetCache cacheManager; + + /** + * Constructs memory mappable loader. + * + * @param cacheManager + * FsDatasetCache reference. + */ + MemoryMappableBlockLoader(FsDatasetCache cacheManager) { + this.cacheManager = cacheManager; + } + /** * Load the block. * @@ -90,7 +102,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader { /** * Verifies the block's checksum. This is an I/O intensive operation. */ - public void verifyChecksum(long length, FileInputStream metaIn, + private void verifyChecksum(long length, FileInputStream metaIn, FileChannel blockChannel, String blockFileName) throws IOException { // Verify the checksum from the block's meta file @@ -139,4 +151,14 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader { IOUtils.closeQuietly(metaChannel); } } + + @Override + long reserve(long bytesCount) { + return cacheManager.reserve(bytesCount); + } + + @Override + long release(long bytesCount) { + return cacheManager.release(bytesCount); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java index 40de32066ae..fd72804804b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestFsDatasetCache; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; @@ -50,6 +51,9 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Tests FsDatasetCache behaviors. + */ public class TestFsDatasetCacheRevocation { private static final Logger LOG = LoggerFactory.getLogger( TestFsDatasetCacheRevocation.class); @@ -86,7 +90,7 @@ public class TestFsDatasetCacheRevocation { conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - new File(sockDir.getDir(), "sock").getAbsolutePath()); + new File(sockDir.getDir(), "sock").getAbsolutePath()); return conf; } @@ -112,19 +116,18 @@ public class TestFsDatasetCacheRevocation { DistributedFileSystem dfs = cluster.getFileSystem(); // Create and cache a file. - final String TEST_FILE = "/test_file"; - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), + final String testFile = "/test_file"; + DFSTestUtil.createFile(dfs, new Path(testFile), BLOCK_SIZE, (short)1, 0xcafe); dfs.addCachePool(new CachePoolInfo("pool")); - long cacheDirectiveId = - dfs.addCacheDirective(new CacheDirectiveInfo.Builder(). - setPool("pool").setPath(new Path(TEST_FILE)). - setReplication((short) 1).build()); + long cacheDirectiveId = dfs + .addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool") + .setPath(new Path(testFile)).setReplication((short) 1).build()); FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd); // Mmap the file. - FSDataInputStream in = dfs.open(new Path(TEST_FILE)); + FSDataInputStream in = dfs.open(new Path(testFile)); ByteBuffer buf = in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class)); @@ -143,8 +146,8 @@ public class TestFsDatasetCacheRevocation { } /** - * Test that when we have an uncache request, and the client refuses to release - * the replica for a long time, we will un-mlock it. + * Test that when we have an uncache request, and the client refuses to + * release the replica for a long time, we will un-mlock it. */ @Test(timeout=120000) public void testRevocation() throws Exception { @@ -163,19 +166,19 @@ public class TestFsDatasetCacheRevocation { DistributedFileSystem dfs = cluster.getFileSystem(); // Create and cache a file. - final String TEST_FILE = "/test_file2"; - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), + final String testFile = "/test_file2"; + DFSTestUtil.createFile(dfs, new Path(testFile), BLOCK_SIZE, (short)1, 0xcafe); dfs.addCachePool(new CachePoolInfo("pool")); long cacheDirectiveId = dfs.addCacheDirective(new CacheDirectiveInfo.Builder(). - setPool("pool").setPath(new Path(TEST_FILE)). + setPool("pool").setPath(new Path(testFile)). setReplication((short) 1).build()); FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd); // Mmap the file. - FSDataInputStream in = dfs.open(new Path(TEST_FILE)); + FSDataInputStream in = dfs.open(new Path(testFile)); ByteBuffer buf = in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java index 2dbd5b9bd01..90605848050 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.datanode; +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import net.jcip.annotations.NotThreadSafe; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -60,9 +60,11 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; +import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; @@ -101,7 +103,7 @@ public class TestFsDatasetCache { private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class); // Most Linux installs allow a default of 64KB locked memory - static final long CACHE_CAPACITY = 64 * 1024; + public static final long CACHE_CAPACITY = 64 * 1024; // mlock always locks the entire page. So we don't need to deal with this // rounding, use the OS page size for the block size. private static final long PAGE_SIZE =