diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 02ce2f4315f..8ad1652f616 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator; import org.apache.hadoop.hdfs.web.URLConnectionFactory; @@ -380,6 +382,23 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms"; public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L; + // Currently, the available cache loaders are MemoryMappableBlockLoader, + // PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache + // loader to cache block replica to memory. + public static final String DFS_DATANODE_CACHE_LOADER_CLASS = + "dfs.datanode.cache.loader.class"; + public static final Class + DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT = + MemoryMappableBlockLoader.class; + // Multiple dirs separated by "," are acceptable. + public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY = + "dfs.datanode.cache.pmem.dirs"; + public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = ""; + // The cache capacity of persistent memory + public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY = + "dfs.datanode.cache.pmem.capacity"; + public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L; + public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 155b800d988..6ee8e9242d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -27,6 +27,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; @@ -66,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.server.common.Util; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader; import org.apache.hadoop.security.SaslPropertiesResolver; import java.util.concurrent.TimeUnit; @@ -115,7 +121,10 @@ public class DNConf { final long xceiverStopTimeout; final long restartReplicaExpiry; + private final Class cacheLoaderClass; final long maxLockedMemory; + private final long maxLockedPmem; + private final String[] pmemDirs; private final long bpReadyTimeout; @@ -257,10 +266,20 @@ public class DNConf { DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); + this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS, + DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class); + this.maxLockedMemory = getConf().getLong( DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT); + this.maxLockedPmem = getConf().getLong( + DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, + DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT); + + this.pmemDirs = getConf().getTrimmedStrings( + DFS_DATANODE_CACHE_PMEM_DIRS_KEY); + this.restartReplicaExpiry = getConf().getLong( DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; @@ -323,6 +342,10 @@ public class DNConf { return maxLockedMemory; } + public long getMaxLockedPmem() { + return maxLockedPmem; + } + /** * Returns true if connect to datanode via hostname * @@ -425,4 +448,12 @@ public class DNConf { int getMaxDataLength() { return maxDataLength; } + + public Class getCacheLoaderClass() { + return cacheLoaderClass; + } + + public String[] getPmemVolumes() { + return pmemDirs; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index f2abadaeb2b..fb67cc42313 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -48,10 +49,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,7 +132,11 @@ public class FsDatasetCache { private final long revocationPollingMs; - private final MappableBlockLoader mappableBlockLoader; + /** + * A specific cacheLoader could cache block either to DRAM or + * to persistent memory. + */ + private final MappableBlockLoader cacheLoader; private final MemoryCacheStats memCacheStats; @@ -173,11 +179,41 @@ public class FsDatasetCache { ". Reconfigure this to " + minRevocationPollingMs); } this.revocationPollingMs = confRevocationPollingMs; - - this.mappableBlockLoader = new MemoryMappableBlockLoader(this); // Both lazy writer and read cache are sharing this statistics. this.memCacheStats = new MemoryCacheStats( dataset.datanode.getDnConf().getMaxLockedMemory()); + + Class cacheLoaderClass = + dataset.datanode.getDnConf().getCacheLoaderClass(); + this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null); + cacheLoader.initialize(this); + } + + /** + * Check if pmem cache is enabled. + */ + private boolean isPmemCacheEnabled() { + return !cacheLoader.isTransientCache(); + } + + DNConf getDnConf() { + return this.dataset.datanode.getDnConf(); + } + + MemoryCacheStats getMemCacheStats() { + return memCacheStats; + } + + /** + * Get the cache path if the replica is cached into persistent memory. + */ + String getReplicaCachePath(String bpid, long blockId) { + if (cacheLoader.isTransientCache() || + !isCached(bpid, blockId)) { + return null; + } + ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); + return cacheLoader.getCachedPath(key); } /** @@ -344,14 +380,14 @@ public class FsDatasetCache { MappableBlock mappableBlock = null; ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); - long newUsedBytes = mappableBlockLoader.reserve(length); + long newUsedBytes = cacheLoader.reserve(length); boolean reservedBytes = false; try { if (newUsedBytes < 0) { LOG.warn("Failed to cache " + key + ": could not reserve " + length + " more bytes in the cache: " + - DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + - " of " + memCacheStats.getCacheCapacity() + " exceeded."); + cacheLoader.getCacheCapacityConfigKey() + + " of " + cacheLoader.getCacheCapacity() + " exceeded."); return; } reservedBytes = true; @@ -370,8 +406,9 @@ public class FsDatasetCache { LOG.warn("Failed to cache " + key + ": failed to open file", e); return; } + try { - mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn, + mappableBlock = cacheLoader.load(length, blockIn, metaIn, blockFileName, key); } catch (ChecksumException e) { // Exception message is bogus since this wasn't caused by a file read @@ -381,6 +418,7 @@ public class FsDatasetCache { LOG.warn("Failed to cache the block [key=" + key + "]!", e); return; } + synchronized (FsDatasetCache.this) { Value value = mappableBlockMap.get(key); Preconditions.checkNotNull(value); @@ -404,7 +442,7 @@ public class FsDatasetCache { IOUtils.closeQuietly(metaIn); if (!success) { if (reservedBytes) { - mappableBlockLoader.release(length); + cacheLoader.release(length); } LOG.debug("Caching of {} was aborted. We are now caching only {} " + "bytes in total.", key, memCacheStats.getCacheUsed()); @@ -481,8 +519,7 @@ public class FsDatasetCache { synchronized (FsDatasetCache.this) { mappableBlockMap.remove(key); } - long newUsedBytes = mappableBlockLoader - .release(value.mappableBlock.getLength()); + long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); dataset.datanode.getMetrics().incrBlocksUncached(1); if (revocationTimeMs != 0) { @@ -498,19 +535,41 @@ public class FsDatasetCache { // Stats related methods for FSDatasetMBean /** - * Get the approximate amount of cache space used. + * Get the approximate amount of DRAM cache space used. */ public long getCacheUsed() { return memCacheStats.getCacheUsed(); } /** - * Get the maximum amount of bytes we can cache. This is a constant. + * Get the approximate amount of persistent memory cache space used. + * TODO: advertise this metric to NameNode by FSDatasetMBean + */ + public long getPmemCacheUsed() { + if (isPmemCacheEnabled()) { + return cacheLoader.getCacheUsed(); + } + return 0; + } + + /** + * Get the maximum amount of bytes we can cache on DRAM. This is a constant. */ public long getCacheCapacity() { return memCacheStats.getCacheCapacity(); } + /** + * Get cache capacity of persistent memory. + * TODO: advertise this metric to NameNode by FSDatasetMBean + */ + public long getPmemCacheCapacity() { + if (isPmemCacheEnabled()) { + return cacheLoader.getCacheCapacity(); + } + return 0; + } + public long getNumBlocksFailedToCache() { return numBlocksFailedToCache.get(); } @@ -528,4 +587,9 @@ public class FsDatasetCache { Value val = mappableBlockMap.get(block); return (val != null) && val.state.shouldAdvertise(); } + + @VisibleForTesting + MappableBlockLoader getCacheLoader() { + return cacheLoader; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index a61fbb1c5a1..29c31eff5e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -800,11 +800,25 @@ class FsDatasetImpl implements FsDatasetSpi { datanode.getMetrics().incrRamDiskBlocksReadHits(); } - if (info != null) { - return info.getDataInputStream(seekOffset); - } else { + if (info == null || !info.blockDataExists()) { throw new IOException("No data exists for block " + b); } + return getBlockInputStreamWithCheckingPmemCache(info, b, seekOffset); + } + + /** + * Check whether the replica is cached to persistent memory. + * If so, get DataInputStream of the corresponding cache file on pmem. + */ + private InputStream getBlockInputStreamWithCheckingPmemCache( + ReplicaInfo info, ExtendedBlock b, long seekOffset) throws IOException { + String cachePath = cacheManager.getReplicaCachePath( + b.getBlockPoolId(), b.getBlockId()); + if (cachePath != null) { + return FsDatasetUtil.getInputStreamAndSeek( + new File(cachePath), seekOffset); + } + return info.getDataInputStream(seekOffset); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 8a3b237e417..92c088860df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -27,6 +27,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.URI; +import java.nio.channels.Channels; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Arrays; import com.google.common.base.Preconditions; @@ -116,6 +119,19 @@ public class FsDatasetUtil { } } + public static InputStream getInputStreamAndSeek(File file, long offset) + throws IOException { + RandomAccessFile raf = null; + try { + raf = new RandomAccessFile(file, "r"); + raf.seek(offset); + return Channels.newInputStream(raf.getChannel()); + } catch(IOException ioe) { + IOUtils.cleanupWithLogger(null, raf); + throw ioe; + } + } + /** * Find the meta-file for the specified block file and then return the * generation stamp from the name of the meta-file. Generally meta file will @@ -183,4 +199,15 @@ public class FsDatasetUtil { FsDatasetImpl.computeChecksum(wrapper, dstMeta, smallBufferSize, conf); } + + public static void deleteMappedFile(String filePath) throws IOException { + if (filePath == null) { + throw new IOException("The filePath should not be null!"); + } + boolean result = Files.deleteIfExists(Paths.get(filePath)); + if (!result) { + throw new IOException( + "Failed to delete the mapped file: " + filePath); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java index 0f5ec2d7022..a9e9610172c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java @@ -34,6 +34,11 @@ import java.nio.channels.FileChannel; @InterfaceStability.Unstable public abstract class MappableBlockLoader { + /** + * Initialize a specific MappableBlockLoader. + */ + abstract void initialize(FsDatasetCache cacheManager) throws IOException; + /** * Load the block. * @@ -60,23 +65,47 @@ public abstract class MappableBlockLoader { /** * Try to reserve some given bytes. * - * @param bytesCount - * The number of bytes to add. + * @param bytesCount The number of bytes to add. * - * @return The new number of usedBytes if we succeeded; -1 if we failed. + * @return The new number of usedBytes if we succeeded; + * -1 if we failed. */ abstract long reserve(long bytesCount); /** * Release some bytes that we're using. * - * @param bytesCount - * The number of bytes to release. + * @param bytesCount The number of bytes to release. * - * @return The new number of usedBytes. + * @return The new number of usedBytes. */ abstract long release(long bytesCount); + /** + * Get the config key of cache capacity. + */ + abstract String getCacheCapacityConfigKey(); + + /** + * Get the approximate amount of cache space used. + */ + abstract long getCacheUsed(); + + /** + * Get the maximum amount of cache bytes. + */ + abstract long getCacheCapacity(); + + /** + * Check whether the cache is non-volatile. + */ + abstract boolean isTransientCache(); + + /** + * Get a cache file path if applicable. Otherwise return null. + */ + abstract String getCachedPath(ExtendedBlockId key); + /** * Reads bytes into a buffer until EOF or the buffer's limit is reached. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java index d93193efc1f..4b7af19513e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.nativeio.NativeIO; @@ -42,16 +43,11 @@ import java.nio.channels.FileChannel; @InterfaceStability.Unstable public class MemoryMappableBlockLoader extends MappableBlockLoader { - private final FsDatasetCache cacheManager; + private MemoryCacheStats memCacheStats; - /** - * Constructs memory mappable loader. - * - * @param cacheManager - * FsDatasetCache reference. - */ - MemoryMappableBlockLoader(FsDatasetCache cacheManager) { - this.cacheManager = cacheManager; + @Override + void initialize(FsDatasetCache cacheManager) throws IOException { + this.memCacheStats = cacheManager.getMemCacheStats(); } /** @@ -72,7 +68,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader { * @return The Mappable block. */ @Override - public MappableBlock load(long length, FileInputStream blockIn, + MappableBlock load(long length, FileInputStream blockIn, FileInputStream metaIn, String blockFileName, ExtendedBlockId key) throws IOException { @@ -152,13 +148,38 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader { } } + @Override + public String getCacheCapacityConfigKey() { + return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; + } + + @Override + public long getCacheUsed() { + return memCacheStats.getCacheUsed(); + } + + @Override + public long getCacheCapacity() { + return memCacheStats.getCacheCapacity(); + } + @Override long reserve(long bytesCount) { - return cacheManager.reserve(bytesCount); + return memCacheStats.reserve(bytesCount); } @Override long release(long bytesCount) { - return cacheManager.release(bytesCount); + return memCacheStats.release(bytesCount); + } + + @Override + public boolean isTransientCache() { + return true; + } + + @Override + public String getCachedPath(ExtendedBlockId key) { + return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java new file mode 100644 index 00000000000..c581d3101a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.DNConf; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +/** + * Maps block to persistent memory by using mapped byte buffer. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PmemMappableBlockLoader extends MappableBlockLoader { + private static final Logger LOG = + LoggerFactory.getLogger(PmemMappableBlockLoader.class); + private PmemVolumeManager pmemVolumeManager; + + @Override + void initialize(FsDatasetCache cacheManager) throws IOException { + DNConf dnConf = cacheManager.getDnConf(); + this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(), + dnConf.getPmemVolumes()); + } + + @VisibleForTesting + PmemVolumeManager getPmemVolumeManager() { + return pmemVolumeManager; + } + + /** + * Load the block. + * + * Map the block and verify its checksum. + * + * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir + * is a persistent memory volume selected by getOneLocation() method. + * + * @param length The current length of the block. + * @param blockIn The block input stream. Should be positioned at the + * start. The caller must close this. + * @param metaIn The meta file input stream. Should be positioned at + * the start. The caller must close this. + * @param blockFileName The block file name, for logging purposes. + * @param key The extended block ID. + * + * @throws IOException If mapping block fails or checksum fails. + * + * @return The Mappable block. + */ + @Override + MappableBlock load(long length, FileInputStream blockIn, + FileInputStream metaIn, String blockFileName, + ExtendedBlockId key) + throws IOException { + PmemMappedBlock mappableBlock = null; + String filePath = null; + + FileChannel blockChannel = null; + RandomAccessFile file = null; + MappedByteBuffer out = null; + try { + blockChannel = blockIn.getChannel(); + if (blockChannel == null) { + throw new IOException("Block InputStream has no FileChannel."); + } + + Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex(); + filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key); + file = new RandomAccessFile(filePath, "rw"); + out = file.getChannel(). + map(FileChannel.MapMode.READ_WRITE, 0, length); + if (out == null) { + throw new IOException("Failed to map the block " + blockFileName + + " to persistent storage."); + } + verifyChecksumAndMapBlock(out, length, metaIn, blockChannel, + blockFileName); + mappableBlock = new PmemMappedBlock( + length, volumeIndex, key, pmemVolumeManager); + pmemVolumeManager.afterCache(key, volumeIndex); + LOG.info("Successfully cached one replica:{} into persistent memory" + + ", [cached path={}, length={}]", key, filePath, length); + } finally { + IOUtils.closeQuietly(blockChannel); + if (out != null) { + NativeIO.POSIX.munmap(out); + } + IOUtils.closeQuietly(file); + if (mappableBlock == null) { + FsDatasetUtil.deleteMappedFile(filePath); + } + } + return mappableBlock; + } + + /** + * Verifies the block's checksum meanwhile maps block to persistent memory. + * This is an I/O intensive operation. + */ + private void verifyChecksumAndMapBlock( + MappedByteBuffer out, long length, FileInputStream metaIn, + FileChannel blockChannel, String blockFileName) + throws IOException { + // Verify the checksum from the block's meta file + // Get the DataChecksum from the meta file header + BlockMetadataHeader header = + BlockMetadataHeader.readHeader(new DataInputStream( + new BufferedInputStream(metaIn, BlockMetadataHeader + .getHeaderSize()))); + FileChannel metaChannel = null; + try { + metaChannel = metaIn.getChannel(); + if (metaChannel == null) { + throw new IOException("Cannot get FileChannel from " + + "Block InputStream meta file."); + } + DataChecksum checksum = header.getChecksum(); + final int bytesPerChecksum = checksum.getBytesPerChecksum(); + final int checksumSize = checksum.getChecksumSize(); + final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum; + ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum); + ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize); + // Verify the checksum + int bytesVerified = 0; + while (bytesVerified < length) { + Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, + "Unexpected partial chunk before EOF"); + assert bytesVerified % bytesPerChecksum == 0; + int bytesRead = fillBuffer(blockChannel, blockBuf); + if (bytesRead == -1) { + throw new IOException( + "Checksum verification failed for the block " + blockFileName + + ": premature EOF"); + } + blockBuf.flip(); + // Number of read chunks, including partial chunk at end + int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuf.limit(chunks * checksumSize); + fillBuffer(metaChannel, checksumBuf); + checksumBuf.flip(); + checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName, + bytesVerified); + + // / Copy data to persistent file + out.put(blockBuf); + // positioning the + bytesVerified += bytesRead; + + // Clear buffer + blockBuf.clear(); + checksumBuf.clear(); + } + // Forces to write data to storage device containing the mapped file + out.force(); + } finally { + IOUtils.closeQuietly(metaChannel); + } + } + + @Override + public String getCacheCapacityConfigKey() { + return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY; + } + + @Override + public long getCacheUsed() { + return pmemVolumeManager.getCacheUsed(); + } + + @Override + public long getCacheCapacity() { + return pmemVolumeManager.getCacheCapacity(); + } + + @Override + long reserve(long bytesCount) { + return pmemVolumeManager.reserve(bytesCount); + } + + @Override + long release(long bytesCount) { + return pmemVolumeManager.release(bytesCount); + } + + @Override + public boolean isTransientCache() { + return false; + } + + @Override + public String getCachedPath(ExtendedBlockId key) { + return pmemVolumeManager.getCacheFilePath(key); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java new file mode 100644 index 00000000000..ce4fa22137a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Represents an HDFS block that is mapped to persistent memory by DataNode + * with mapped byte buffer. PMDK is NOT involved in this implementation. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PmemMappedBlock implements MappableBlock { + private static final Logger LOG = + LoggerFactory.getLogger(PmemMappedBlock.class); + private final PmemVolumeManager pmemVolumeManager; + private long length; + private Byte volumeIndex = null; + private ExtendedBlockId key; + + PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key, + PmemVolumeManager pmemVolumeManager) { + assert length > 0; + this.length = length; + this.volumeIndex = volumeIndex; + this.key = key; + this.pmemVolumeManager = pmemVolumeManager; + } + + @Override + public long getLength() { + return length; + } + + @Override + public void close() { + String cacheFilePath = + pmemVolumeManager.inferCacheFilePath(volumeIndex, key); + try { + FsDatasetUtil.deleteMappedFile(cacheFilePath); + pmemVolumeManager.afterUncache(key); + LOG.info("Successfully uncached one replica:{} from persistent memory" + + ", [cached path={}, length={}]", key, cacheFilePath, length); + } catch (IOException e) { + LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java new file mode 100644 index 00000000000..76aa2dd5c80 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manage the persistent memory volumes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PmemVolumeManager { + + /** + * Counts used bytes for persistent memory. + */ + private class UsedBytesCount { + private final AtomicLong usedBytes = new AtomicLong(0); + + /** + * Try to reserve more bytes. + * + * @param bytesCount The number of bytes to add. + * + * @return The new number of usedBytes if we succeeded; + * -1 if we failed. + */ + long reserve(long bytesCount) { + while (true) { + long cur = usedBytes.get(); + long next = cur + bytesCount; + if (next > cacheCapacity) { + return -1; + } + if (usedBytes.compareAndSet(cur, next)) { + return next; + } + } + } + + /** + * Release some bytes that we're using. + * + * @param bytesCount The number of bytes to release. + * + * @return The new number of usedBytes. + */ + long release(long bytesCount) { + return usedBytes.addAndGet(-bytesCount); + } + + long get() { + return usedBytes.get(); + } + } + + private static final Logger LOG = + LoggerFactory.getLogger(PmemVolumeManager.class); + private final ArrayList pmemVolumes = new ArrayList<>(); + // Maintain which pmem volume a block is cached to. + private final Map blockKeyToVolume = + new ConcurrentHashMap<>(); + private final UsedBytesCount usedBytesCount; + + /** + * The total cache capacity in bytes of persistent memory. + * It is 0L if the specific mappableBlockLoader couldn't cache data to pmem. + */ + private final long cacheCapacity; + private int count = 0; + // Strict atomic operation is not guaranteed for the performance sake. + private int i = 0; + + PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured) + throws IOException { + if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) { + throw new IOException("The persistent memory volume, " + + DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY + + " is not configured!"); + } + this.loadVolumes(pmemVolumesConfigured); + this.usedBytesCount = new UsedBytesCount(); + this.cacheCapacity = maxBytes; + } + + public long getCacheUsed() { + return usedBytesCount.get(); + } + + public long getCacheCapacity() { + return cacheCapacity; + } + + /** + * Try to reserve more bytes on persistent memory. + * + * @param bytesCount The number of bytes to add. + * + * @return The new number of usedBytes if we succeeded; + * -1 if we failed. + */ + long reserve(long bytesCount) { + return usedBytesCount.reserve(bytesCount); + } + + /** + * Release some bytes that we're using on persistent memory. + * + * @param bytesCount The number of bytes to release. + * + * @return The new number of usedBytes. + */ + long release(long bytesCount) { + return usedBytesCount.release(bytesCount); + } + + /** + * Load and verify the configured pmem volumes. + * + * @throws IOException If there is no available pmem volume. + */ + private void loadVolumes(String[] volumes) throws IOException { + // Check whether the volume exists + for (String volume: volumes) { + try { + File pmemDir = new File(volume); + verifyIfValidPmemVolume(pmemDir); + // Remove all files under the volume. + FileUtils.cleanDirectory(pmemDir); + } catch (IllegalArgumentException e) { + LOG.error("Failed to parse persistent memory volume " + volume, e); + continue; + } catch (IOException e) { + LOG.error("Bad persistent memory volume: " + volume, e); + continue; + } + pmemVolumes.add(volume); + LOG.info("Added persistent memory - " + volume); + } + count = pmemVolumes.size(); + if (count == 0) { + throw new IOException( + "At least one valid persistent memory volume is required!"); + } + } + + @VisibleForTesting + static void verifyIfValidPmemVolume(File pmemDir) + throws IOException { + if (!pmemDir.exists()) { + final String message = pmemDir + " does not exist"; + throw new IOException(message); + } + + if (!pmemDir.isDirectory()) { + final String message = pmemDir + " is not a directory"; + throw new IllegalArgumentException(message); + } + + String uuidStr = UUID.randomUUID().toString(); + String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr; + byte[] contents = uuidStr.getBytes("UTF-8"); + RandomAccessFile testFile = null; + MappedByteBuffer out = null; + try { + testFile = new RandomAccessFile(testFilePath, "rw"); + out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, + contents.length); + if (out == null) { + throw new IOException("Failed to map the test file under " + pmemDir); + } + out.put(contents); + // Forces to write data to storage device containing the mapped file + out.force(); + } catch (IOException e) { + throw new IOException( + "Exception while writing data to persistent storage dir: " + + pmemDir, e); + } finally { + if (out != null) { + out.clear(); + } + if (testFile != null) { + IOUtils.closeQuietly(testFile); + NativeIO.POSIX.munmap(out); + try { + FsDatasetUtil.deleteMappedFile(testFilePath); + } catch (IOException e) { + LOG.warn("Failed to delete test file " + testFilePath + + " from persistent memory", e); + } + } + } + } + + /** + * Choose a persistent memory volume based on a specific algorithm. + * Currently it is a round-robin policy. + * + * TODO: Refine volume selection policy by considering storage utilization. + */ + Byte getOneVolumeIndex() throws IOException { + if (count != 0) { + return (byte)(i++ % count); + } else { + throw new IOException("No usable persistent memory is found"); + } + } + + @VisibleForTesting + String getVolumeByIndex(Byte index) { + return pmemVolumes.get(index); + } + + /** + * The cache file is named as BlockPoolId-BlockId. + * So its name can be inferred by BlockPoolId and BlockId. + */ + public String getCacheFileName(ExtendedBlockId key) { + return key.getBlockPoolId() + "-" + key.getBlockId(); + } + + /** + * Considering the pmem volume size is below TB level currently, + * it is tolerable to keep cache files under one directory. + * The strategy will be optimized, especially if one pmem volume + * has huge cache capacity. + * + * @param volumeIndex The index of pmem volume where a replica will be + * cached to or has been cached to. + * + * @param key The replica's ExtendedBlockId. + * + * @return A path to which the block replica is mapped. + */ + public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) { + return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key); + } + + /** + * The cache file path is pmemVolume/BlockPoolId-BlockId. + */ + public String getCacheFilePath(ExtendedBlockId key) { + Byte volumeIndex = blockKeyToVolume.get(key); + if (volumeIndex == null) { + return null; + } + return inferCacheFilePath(volumeIndex, key); + } + + @VisibleForTesting + Map getBlockKeyToVolume() { + return blockKeyToVolume; + } + + /** + * Add cached block's ExtendedBlockId and its cache volume index to a map + * after cache. + */ + public void afterCache(ExtendedBlockId key, Byte volumeIndex) { + blockKeyToVolume.put(key, volumeIndex); + } + + /** + * Remove the record in blockKeyToVolume for uncached block after uncache. + */ + public void afterUncache(ExtendedBlockId key) { + blockKeyToVolume.remove(key); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ada4de0bf7e..3fcb3879de3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2510,6 +2510,18 @@ + + dfs.datanode.cache.loader.class + org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader + + Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader. + By default, MemoryMappableBlockLoader is used and it maps block replica into memory. + PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is + implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent + memory directory. + + + dfs.datanode.max.locked.memory 0 @@ -2526,6 +2538,28 @@ + + dfs.datanode.cache.pmem.capacity + 0 + + The amount of persistent memory in bytes that can be used to cache block + replicas to persistent memory. Currently, persistent memory is only enabled + in HDFS Centralized Cache Management feature. + + By default, this parameter is 0, which disables persistent memory caching. + + + + + dfs.datanode.cache.pmem.dirs + + + This value specifies the persistent memory directory used for caching block + replica. It matters only if the value of dfs.datanode.cache.loader.class is + PmemMappableBlockLoader. Multiple directories separated by "," are acceptable. + + + dfs.namenode.list.cache.directives.num.responses 100 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java new file mode 100644 index 00000000000..9b4f06fc9db --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.HdfsBlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.event.Level; + +import com.google.common.base.Supplier; +import com.google.common.primitives.Ints; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY; + +/** + * Tests HDFS persistent memory cache by PmemMappableBlockLoader. + * + * Bogus persistent memory volume is used to cache blocks. + */ +public class TestCacheByPmemMappableBlockLoader { + protected static final org.slf4j.Logger LOG = + LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class); + + protected static final long CACHE_CAPACITY = 64 * 1024; + protected static final long BLOCK_SIZE = 4 * 1024; + + private static Configuration conf; + private static MiniDFSCluster cluster = null; + private static DistributedFileSystem fs; + private static DataNode dn; + private static FsDatasetCache cacheManager; + private static PmemMappableBlockLoader cacheLoader; + /** + * Used to pause DN BPServiceActor threads. BPSA threads acquire the + * shared read lock. The test acquires the write lock for exclusive access. + */ + private static ReadWriteLock lock = new ReentrantReadWriteLock(true); + private static CacheManipulator prevCacheManipulator; + private static DataNodeFaultInjector oldInjector; + + private static final String PMEM_DIR_0 = + MiniDFSCluster.getBaseDirectory() + "pmem0"; + private static final String PMEM_DIR_1 = + MiniDFSCluster.getBaseDirectory() + "pmem1"; + + static { + GenericTestUtils.setLogLevel( + LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG); + } + + @BeforeClass + public static void setUpClass() throws Exception { + oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + @Override + public void startOfferService() throws Exception { + lock.readLock().lock(); + } + + @Override + public void endOfferService() throws Exception { + lock.readLock().unlock(); + } + }); + } + + @AfterClass + public static void tearDownClass() throws Exception { + DataNodeFaultInjector.set(oldInjector); + } + + @Before + public void setUp() throws Exception { + conf = new HdfsConfiguration(); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100); + conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + CACHE_CAPACITY); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10); + + // Configuration for pmem cache + conf.set(DFS_DATANODE_CACHE_LOADER_CLASS, + "org.apache.hadoop.hdfs.server.datanode." + + "fsdataset.impl.PmemMappableBlockLoader"); + new File(PMEM_DIR_0).getAbsoluteFile().mkdir(); + new File(PMEM_DIR_1).getAbsoluteFile().mkdir(); + // Configure two bogus pmem volumes + conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1); + conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY); + + prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); + NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + cluster.waitActive(); + + fs = cluster.getFileSystem(); + dn = cluster.getDataNodes().get(0); + cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager; + cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader(); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.close(); + fs = null; + } + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + NativeIO.POSIX.setCacheManipulator(prevCacheManipulator); + } + + protected static void shutdownCluster() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testPmemVolumeManager() throws IOException { + PmemVolumeManager pmemVolumeManager = + cacheLoader.getPmemVolumeManager(); + assertNotNull(pmemVolumeManager); + assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity()); + // Test round-robin selection policy + long count1 = 0, count2 = 0; + for (int i = 0; i < 10; i++) { + Byte index = pmemVolumeManager.getOneVolumeIndex(); + String volume = pmemVolumeManager.getVolumeByIndex(index); + if (volume.equals(PMEM_DIR_0)) { + count1++; + } else if (volume.equals(PMEM_DIR_1)) { + count2++; + } else { + fail("Unexpected persistent storage location:" + volume); + } + } + assertEquals(count1, count2); + } + + public List getExtendedBlockId(Path filePath, long fileLen) + throws IOException { + List keys = new ArrayList<>(); + HdfsBlockLocation[] locs = + (HdfsBlockLocation[]) fs.getFileBlockLocations(filePath, 0, fileLen); + for (HdfsBlockLocation loc : locs) { + long bkid = loc.getLocatedBlock().getBlock().getBlockId(); + String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId(); + keys.add(new ExtendedBlockId(bkid, bpid)); + } + return keys; + } + + @Test(timeout = 60000) + public void testCacheAndUncache() throws Exception { + final int maxCacheBlocksNum = + Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE); + BlockReaderTestUtil.enableHdfsCachingTracing(); + Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE); + assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity()); + + final Path testFile = new Path("/testFile"); + final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE; + DFSTestUtil.createFile(fs, testFile, + testFileLen, (short) 1, 0xbeef); + List blockKeys = + getExtendedBlockId(testFile, testFileLen); + fs.addCachePool(new CachePoolInfo("testPool")); + final long cacheDirectiveId = fs.addCacheDirective( + new CacheDirectiveInfo.Builder().setPool("testPool"). + setPath(testFile).setReplication((short) 1).build()); + // wait for caching + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + long blocksCached = + MetricsAsserts.getLongCounter("BlocksCached", dnMetrics); + if (blocksCached != maxCacheBlocksNum) { + LOG.info("waiting for " + maxCacheBlocksNum + " blocks to " + + "be cached. Right now " + blocksCached + " blocks are cached."); + return false; + } + LOG.info(maxCacheBlocksNum + " blocks are now cached."); + return true; + } + }, 1000, 30000); + + // The pmem cache space is expected to have been used up. + assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed()); + Map blockKeyToVolume = + cacheLoader.getPmemVolumeManager().getBlockKeyToVolume(); + // All block keys should be kept in blockKeyToVolume + assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum); + assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys)); + // Test each replica's cache file path + for (ExtendedBlockId key : blockKeys) { + String cachePath = cacheManager. + getReplicaCachePath(key.getBlockPoolId(), key.getBlockId()); + // The cachePath shouldn't be null if the replica has been cached + // to pmem. + assertNotNull(cachePath); + String expectFileName = + cacheLoader.getPmemVolumeManager().getCacheFileName(key); + if (cachePath.startsWith(PMEM_DIR_0)) { + assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName)); + } else if (cachePath.startsWith(PMEM_DIR_1)) { + assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName)); + } else { + fail("The cache path is not the expected one: " + cachePath); + } + } + + // Try to cache another file. Caching this file should fail + // due to lack of available cache space. + final Path smallTestFile = new Path("/smallTestFile"); + final long smallTestFileLen = BLOCK_SIZE; + DFSTestUtil.createFile(fs, smallTestFile, + smallTestFileLen, (short) 1, 0xbeef); + // Try to cache more blocks when no cache space is available. + final long smallFileCacheDirectiveId = fs.addCacheDirective( + new CacheDirectiveInfo.Builder().setPool("testPool"). + setPath(smallTestFile).setReplication((short) 1).build()); + + // Wait for enough time to verify smallTestFile could not be cached. + Thread.sleep(10000); + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + long blocksCached = + MetricsAsserts.getLongCounter("BlocksCached", dnMetrics); + // The cached block num should not be increased. + assertTrue(blocksCached == maxCacheBlocksNum); + // The blockKeyToVolume should just keep the block keys for the testFile. + assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum); + assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys)); + // Stop trying to cache smallTestFile to avoid interfering the + // verification for uncache functionality. + fs.removeCacheDirective(smallFileCacheDirectiveId); + + // Uncache the test file + fs.removeCacheDirective(cacheDirectiveId); + // Wait for uncaching + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + long blocksUncached = + MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics); + if (blocksUncached != maxCacheBlocksNum) { + LOG.info("waiting for " + maxCacheBlocksNum + " blocks to be " + + "uncached. Right now " + blocksUncached + + " blocks are uncached."); + return false; + } + LOG.info(maxCacheBlocksNum + " blocks have been uncached."); + return true; + } + }, 1000, 30000); + + // It is expected that no pmem cache space is used. + assertEquals(0, cacheManager.getPmemCacheUsed()); + // No record should be kept by blockKeyToVolume after testFile is uncached. + assertEquals(blockKeyToVolume.size(), 0); + } +} \ No newline at end of file