HDFS-14355 : Implement HDFS cache on SCM by using pure java mapped byte buffer. Contributed by Feilong He.

This commit is contained in:
Uma Maheswara Rao G 2019-03-30 23:33:25 -07:00
parent bf3b7fd732
commit 35ff31dd94
12 changed files with 1208 additions and 34 deletions

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@ -390,6 +392,23 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
// Currently, the available cache loaders are MemoryMappableBlockLoader,
// PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
// loader to cache block replica to memory.
public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
"dfs.datanode.cache.loader.class";
public static final Class<? extends MappableBlockLoader>
DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
MemoryMappableBlockLoader.class;
// Multiple dirs separated by "," are acceptable.
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
"dfs.datanode.cache.pmem.dirs";
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
// The cache capacity of persistent memory
public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
"dfs.datanode.cache.pmem.capacity";
public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -27,6 +27,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
import org.apache.hadoop.security.SaslPropertiesResolver;
import java.util.concurrent.TimeUnit;
@ -112,7 +118,10 @@ public class DNConf {
final long xceiverStopTimeout;
final long restartReplicaExpiry;
private final Class<? extends MappableBlockLoader> cacheLoaderClass;
final long maxLockedMemory;
private final long maxLockedPmem;
private final String[] pmemDirs;
private final long bpReadyTimeout;
@ -251,10 +260,20 @@ public class DNConf {
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
this.maxLockedMemory = getConf().getLongBytes(
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
this.maxLockedPmem = getConf().getLongBytes(
DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
this.pmemDirs = getConf().getTrimmedStrings(
DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
this.restartReplicaExpiry = getConf().getLong(
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
@ -317,6 +336,10 @@ public class DNConf {
return maxLockedMemory;
}
public long getMaxLockedPmem() {
return maxLockedPmem;
}
/**
* Returns true if connect to datanode via hostname
*
@ -419,4 +442,12 @@ public class DNConf {
int getMaxDataLength() {
return maxDataLength;
}
public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
return cacheLoaderClass;
}
public String[] getPmemVolumes() {
return pmemDirs;
}
}

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -48,10 +49,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -130,7 +132,11 @@ public class FsDatasetCache {
private final long revocationPollingMs;
private final MappableBlockLoader mappableBlockLoader;
/**
* A specific cacheLoader could cache block either to DRAM or
* to persistent memory.
*/
private final MappableBlockLoader cacheLoader;
private final MemoryCacheStats memCacheStats;
@ -173,11 +179,41 @@ public class FsDatasetCache {
". Reconfigure this to " + minRevocationPollingMs);
}
this.revocationPollingMs = confRevocationPollingMs;
this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
// Both lazy writer and read cache are sharing this statistics.
this.memCacheStats = new MemoryCacheStats(
dataset.datanode.getDnConf().getMaxLockedMemory());
Class<? extends MappableBlockLoader> cacheLoaderClass =
dataset.datanode.getDnConf().getCacheLoaderClass();
this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
cacheLoader.initialize(this);
}
/**
* Check if pmem cache is enabled.
*/
private boolean isPmemCacheEnabled() {
return !cacheLoader.isTransientCache();
}
DNConf getDnConf() {
return this.dataset.datanode.getDnConf();
}
MemoryCacheStats getMemCacheStats() {
return memCacheStats;
}
/**
* Get the cache path if the replica is cached into persistent memory.
*/
String getReplicaCachePath(String bpid, long blockId) {
if (cacheLoader.isTransientCache() ||
!isCached(bpid, blockId)) {
return null;
}
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
return cacheLoader.getCachedPath(key);
}
/**
@ -344,14 +380,14 @@ public class FsDatasetCache {
MappableBlock mappableBlock = null;
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
long newUsedBytes = mappableBlockLoader.reserve(length);
long newUsedBytes = cacheLoader.reserve(length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
" more bytes in the cache: " +
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
" of " + memCacheStats.getCacheCapacity() + " exceeded.");
cacheLoader.getCacheCapacityConfigKey() +
" of " + cacheLoader.getCacheCapacity() + " exceeded.");
return;
}
reservedBytes = true;
@ -370,8 +406,9 @@ public class FsDatasetCache {
LOG.warn("Failed to cache " + key + ": failed to open file", e);
return;
}
try {
mappableBlock = mappableBlockLoader.load(length, blockIn, metaIn,
mappableBlock = cacheLoader.load(length, blockIn, metaIn,
blockFileName, key);
} catch (ChecksumException e) {
// Exception message is bogus since this wasn't caused by a file read
@ -381,6 +418,7 @@ public class FsDatasetCache {
LOG.warn("Failed to cache the block [key=" + key + "]!", e);
return;
}
synchronized (FsDatasetCache.this) {
Value value = mappableBlockMap.get(key);
Preconditions.checkNotNull(value);
@ -404,7 +442,7 @@ public class FsDatasetCache {
IOUtils.closeQuietly(metaIn);
if (!success) {
if (reservedBytes) {
mappableBlockLoader.release(length);
cacheLoader.release(length);
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, memCacheStats.getCacheUsed());
@ -481,8 +519,7 @@ public class FsDatasetCache {
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
long newUsedBytes = mappableBlockLoader
.release(value.mappableBlock.getLength());
long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) {
@ -498,19 +535,41 @@ public class FsDatasetCache {
// Stats related methods for FSDatasetMBean
/**
* Get the approximate amount of cache space used.
* Get the approximate amount of DRAM cache space used.
*/
public long getCacheUsed() {
return memCacheStats.getCacheUsed();
}
/**
* Get the maximum amount of bytes we can cache. This is a constant.
* Get the approximate amount of persistent memory cache space used.
* TODO: advertise this metric to NameNode by FSDatasetMBean
*/
public long getPmemCacheUsed() {
if (isPmemCacheEnabled()) {
return cacheLoader.getCacheUsed();
}
return 0;
}
/**
* Get the maximum amount of bytes we can cache on DRAM. This is a constant.
*/
public long getCacheCapacity() {
return memCacheStats.getCacheCapacity();
}
/**
* Get cache capacity of persistent memory.
* TODO: advertise this metric to NameNode by FSDatasetMBean
*/
public long getPmemCacheCapacity() {
if (isPmemCacheEnabled()) {
return cacheLoader.getCacheCapacity();
}
return 0;
}
public long getNumBlocksFailedToCache() {
return numBlocksFailedToCache.get();
}
@ -528,4 +587,9 @@ public class FsDatasetCache {
Value val = mappableBlockMap.get(block);
return (val != null) && val.state.shouldAdvertise();
}
@VisibleForTesting
MappableBlockLoader getCacheLoader() {
return cacheLoader;
}
}

View File

@ -787,11 +787,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
datanode.getMetrics().incrRamDiskBlocksReadHits();
}
if (info != null) {
return info.getDataInputStream(seekOffset);
} else {
if (info == null) {
throw new IOException("No data exists for block " + b);
}
return getBlockInputStreamWithCheckingPmemCache(info, b, seekOffset);
}
/**
* Check whether the replica is cached to persistent memory.
* If so, get DataInputStream of the corresponding cache file on pmem.
*/
private InputStream getBlockInputStreamWithCheckingPmemCache(
ReplicaInfo info, ExtendedBlock b, long seekOffset) throws IOException {
String cachePath = cacheManager.getReplicaCachePath(
b.getBlockPoolId(), b.getBlockId());
if (cachePath != null) {
return FsDatasetUtil.getInputStreamAndSeek(
new File(cachePath), seekOffset);
}
return info.getDataInputStream(seekOffset);
}
/**

View File

@ -27,6 +27,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import com.google.common.base.Preconditions;
@ -116,6 +119,19 @@ public class FsDatasetUtil {
}
}
public static InputStream getInputStreamAndSeek(File file, long offset)
throws IOException {
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(file, "r");
raf.seek(offset);
return Channels.newInputStream(raf.getChannel());
} catch(IOException ioe) {
IOUtils.cleanupWithLogger(null, raf);
throw ioe;
}
}
/**
* Find the meta-file for the specified block file and then return the
* generation stamp from the name of the meta-file. Generally meta file will
@ -183,4 +199,15 @@ public class FsDatasetUtil {
FsDatasetImpl.computeChecksum(wrapper, dstMeta, smallBufferSize, conf);
}
public static void deleteMappedFile(String filePath) throws IOException {
if (filePath == null) {
throw new IOException("The filePath should not be null!");
}
boolean result = Files.deleteIfExists(Paths.get(filePath));
if (!result) {
throw new IOException(
"Failed to delete the mapped file: " + filePath);
}
}
}

View File

@ -34,6 +34,11 @@ import java.nio.channels.FileChannel;
@InterfaceStability.Unstable
public abstract class MappableBlockLoader {
/**
* Initialize a specific MappableBlockLoader.
*/
abstract void initialize(FsDatasetCache cacheManager) throws IOException;
/**
* Load the block.
*
@ -60,23 +65,47 @@ public abstract class MappableBlockLoader {
/**
* Try to reserve some given bytes.
*
* @param bytesCount
* The number of bytes to add.
* @param bytesCount The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded; -1 if we failed.
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
abstract long reserve(long bytesCount);
/**
* Release some bytes that we're using.
*
* @param bytesCount
* The number of bytes to release.
* @param bytesCount The number of bytes to release.
*
* @return The new number of usedBytes.
* @return The new number of usedBytes.
*/
abstract long release(long bytesCount);
/**
* Get the config key of cache capacity.
*/
abstract String getCacheCapacityConfigKey();
/**
* Get the approximate amount of cache space used.
*/
abstract long getCacheUsed();
/**
* Get the maximum amount of cache bytes.
*/
abstract long getCacheCapacity();
/**
* Check whether the cache is non-volatile.
*/
abstract boolean isTransientCache();
/**
* Get a cache file path if applicable. Otherwise return null.
*/
abstract String getCachedPath(ExtendedBlockId key);
/**
* Reads bytes into a buffer until EOF or the buffer's limit is reached.
*/

View File

@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
@ -42,16 +43,11 @@ import java.nio.channels.FileChannel;
@InterfaceStability.Unstable
public class MemoryMappableBlockLoader extends MappableBlockLoader {
private final FsDatasetCache cacheManager;
private MemoryCacheStats memCacheStats;
/**
* Constructs memory mappable loader.
*
* @param cacheManager
* FsDatasetCache reference.
*/
MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
this.cacheManager = cacheManager;
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
this.memCacheStats = cacheManager.getMemCacheStats();
}
/**
@ -72,7 +68,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
* @return The Mappable block.
*/
@Override
public MappableBlock load(long length, FileInputStream blockIn,
MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName,
ExtendedBlockId key)
throws IOException {
@ -152,13 +148,38 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
}
}
@Override
public String getCacheCapacityConfigKey() {
return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
}
@Override
public long getCacheUsed() {
return memCacheStats.getCacheUsed();
}
@Override
public long getCacheCapacity() {
return memCacheStats.getCacheCapacity();
}
@Override
long reserve(long bytesCount) {
return cacheManager.reserve(bytesCount);
return memCacheStats.reserve(bytesCount);
}
@Override
long release(long bytesCount) {
return cacheManager.release(bytesCount);
return memCacheStats.release(bytesCount);
}
@Override
public boolean isTransientCache() {
return true;
}
@Override
public String getCachedPath(ExtendedBlockId key) {
return null;
}
}

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
* Maps block to persistent memory by using mapped byte buffer.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class PmemMappableBlockLoader extends MappableBlockLoader {
private static final Logger LOG =
LoggerFactory.getLogger(PmemMappableBlockLoader.class);
private PmemVolumeManager pmemVolumeManager;
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
DNConf dnConf = cacheManager.getDnConf();
this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
dnConf.getPmemVolumes());
}
@VisibleForTesting
PmemVolumeManager getPmemVolumeManager() {
return pmemVolumeManager;
}
/**
* Load the block.
*
* Map the block and verify its checksum.
*
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
* is a persistent memory volume selected by getOneLocation() method.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
* start. The caller must close this.
* @param metaIn The meta file input stream. Should be positioned at
* the start. The caller must close this.
* @param blockFileName The block file name, for logging purposes.
* @param key The extended block ID.
*
* @throws IOException If mapping block fails or checksum fails.
*
* @return The Mappable block.
*/
@Override
MappableBlock load(long length, FileInputStream blockIn,
FileInputStream metaIn, String blockFileName,
ExtendedBlockId key)
throws IOException {
PmemMappedBlock mappableBlock = null;
String filePath = null;
FileChannel blockChannel = null;
RandomAccessFile file = null;
MappedByteBuffer out = null;
try {
blockChannel = blockIn.getChannel();
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
file = new RandomAccessFile(filePath, "rw");
out = file.getChannel().
map(FileChannel.MapMode.READ_WRITE, 0, length);
if (out == null) {
throw new IOException("Failed to map the block " + blockFileName +
" to persistent storage.");
}
verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
blockFileName);
mappableBlock = new PmemMappedBlock(
length, volumeIndex, key, pmemVolumeManager);
pmemVolumeManager.afterCache(key, volumeIndex);
LOG.info("Successfully cached one replica:{} into persistent memory"
+ ", [cached path={}, length={}]", key, filePath, length);
} finally {
IOUtils.closeQuietly(blockChannel);
if (out != null) {
NativeIO.POSIX.munmap(out);
}
IOUtils.closeQuietly(file);
if (mappableBlock == null) {
FsDatasetUtil.deleteMappedFile(filePath);
}
}
return mappableBlock;
}
/**
* Verifies the block's checksum meanwhile maps block to persistent memory.
* This is an I/O intensive operation.
*/
private void verifyChecksumAndMapBlock(
MappedByteBuffer out, long length, FileInputStream metaIn,
FileChannel blockChannel, String blockFileName)
throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
BlockMetadataHeader.readHeader(new DataInputStream(
new BufferedInputStream(metaIn, BlockMetadataHeader
.getHeaderSize())));
FileChannel metaChannel = null;
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
throw new IOException("Cannot get FileChannel from " +
"Block InputStream meta file.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
final int numChunks = (8 * 1024 * 1024) / bytesPerChecksum;
ByteBuffer blockBuf = ByteBuffer.allocate(numChunks * bytesPerChecksum);
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
// Verify the checksum
int bytesVerified = 0;
while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
"Unexpected partial chunk before EOF");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
throw new IOException(
"Checksum verification failed for the block " + blockFileName +
": premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
int chunks = (bytesRead + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.limit(chunks * checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
// / Copy data to persistent file
out.put(blockBuf);
// positioning the
bytesVerified += bytesRead;
// Clear buffer
blockBuf.clear();
checksumBuf.clear();
}
// Forces to write data to storage device containing the mapped file
out.force();
} finally {
IOUtils.closeQuietly(metaChannel);
}
}
@Override
public String getCacheCapacityConfigKey() {
return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
}
@Override
public long getCacheUsed() {
return pmemVolumeManager.getCacheUsed();
}
@Override
public long getCacheCapacity() {
return pmemVolumeManager.getCacheCapacity();
}
@Override
long reserve(long bytesCount) {
return pmemVolumeManager.reserve(bytesCount);
}
@Override
long release(long bytesCount) {
return pmemVolumeManager.release(bytesCount);
}
@Override
public boolean isTransientCache() {
return false;
}
@Override
public String getCachedPath(ExtendedBlockId key) {
return pmemVolumeManager.getCacheFilePath(key);
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Represents an HDFS block that is mapped to persistent memory by DataNode
* with mapped byte buffer. PMDK is NOT involved in this implementation.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class PmemMappedBlock implements MappableBlock {
private static final Logger LOG =
LoggerFactory.getLogger(PmemMappedBlock.class);
private final PmemVolumeManager pmemVolumeManager;
private long length;
private Byte volumeIndex = null;
private ExtendedBlockId key;
PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
PmemVolumeManager pmemVolumeManager) {
assert length > 0;
this.length = length;
this.volumeIndex = volumeIndex;
this.key = key;
this.pmemVolumeManager = pmemVolumeManager;
}
@Override
public long getLength() {
return length;
}
@Override
public void close() {
String cacheFilePath =
pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
try {
FsDatasetUtil.deleteMappedFile(cacheFilePath);
pmemVolumeManager.afterUncache(key);
LOG.info("Successfully uncached one replica:{} from persistent memory"
+ ", [cached path={}, length={}]", key, cacheFilePath, length);
} catch (IOException e) {
LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e);
}
}
}

View File

@ -0,0 +1,306 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* Manage the persistent memory volumes.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class PmemVolumeManager {
/**
* Counts used bytes for persistent memory.
*/
private class UsedBytesCount {
private final AtomicLong usedBytes = new AtomicLong(0);
/**
* Try to reserve more bytes.
*
* @param bytesCount The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
long reserve(long bytesCount) {
while (true) {
long cur = usedBytes.get();
long next = cur + bytesCount;
if (next > cacheCapacity) {
return -1;
}
if (usedBytes.compareAndSet(cur, next)) {
return next;
}
}
}
/**
* Release some bytes that we're using.
*
* @param bytesCount The number of bytes to release.
*
* @return The new number of usedBytes.
*/
long release(long bytesCount) {
return usedBytes.addAndGet(-bytesCount);
}
long get() {
return usedBytes.get();
}
}
private static final Logger LOG =
LoggerFactory.getLogger(PmemVolumeManager.class);
private final ArrayList<String> pmemVolumes = new ArrayList<>();
// Maintain which pmem volume a block is cached to.
private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
new ConcurrentHashMap<>();
private final UsedBytesCount usedBytesCount;
/**
* The total cache capacity in bytes of persistent memory.
* It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
*/
private final long cacheCapacity;
private int count = 0;
// Strict atomic operation is not guaranteed for the performance sake.
private int i = 0;
PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
throws IOException {
if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
throw new IOException("The persistent memory volume, " +
DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
" is not configured!");
}
this.loadVolumes(pmemVolumesConfigured);
this.usedBytesCount = new UsedBytesCount();
this.cacheCapacity = maxBytes;
}
public long getCacheUsed() {
return usedBytesCount.get();
}
public long getCacheCapacity() {
return cacheCapacity;
}
/**
* Try to reserve more bytes on persistent memory.
*
* @param bytesCount The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
long reserve(long bytesCount) {
return usedBytesCount.reserve(bytesCount);
}
/**
* Release some bytes that we're using on persistent memory.
*
* @param bytesCount The number of bytes to release.
*
* @return The new number of usedBytes.
*/
long release(long bytesCount) {
return usedBytesCount.release(bytesCount);
}
/**
* Load and verify the configured pmem volumes.
*
* @throws IOException If there is no available pmem volume.
*/
private void loadVolumes(String[] volumes) throws IOException {
// Check whether the volume exists
for (String volume: volumes) {
try {
File pmemDir = new File(volume);
verifyIfValidPmemVolume(pmemDir);
// Remove all files under the volume.
FileUtils.cleanDirectory(pmemDir);
} catch (IllegalArgumentException e) {
LOG.error("Failed to parse persistent memory volume " + volume, e);
continue;
} catch (IOException e) {
LOG.error("Bad persistent memory volume: " + volume, e);
continue;
}
pmemVolumes.add(volume);
LOG.info("Added persistent memory - " + volume);
}
count = pmemVolumes.size();
if (count == 0) {
throw new IOException(
"At least one valid persistent memory volume is required!");
}
}
@VisibleForTesting
static void verifyIfValidPmemVolume(File pmemDir)
throws IOException {
if (!pmemDir.exists()) {
final String message = pmemDir + " does not exist";
throw new IOException(message);
}
if (!pmemDir.isDirectory()) {
final String message = pmemDir + " is not a directory";
throw new IllegalArgumentException(message);
}
String uuidStr = UUID.randomUUID().toString();
String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
byte[] contents = uuidStr.getBytes("UTF-8");
RandomAccessFile testFile = null;
MappedByteBuffer out = null;
try {
testFile = new RandomAccessFile(testFilePath, "rw");
out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
contents.length);
if (out == null) {
throw new IOException("Failed to map the test file under " + pmemDir);
}
out.put(contents);
// Forces to write data to storage device containing the mapped file
out.force();
} catch (IOException e) {
throw new IOException(
"Exception while writing data to persistent storage dir: " +
pmemDir, e);
} finally {
if (out != null) {
out.clear();
}
if (testFile != null) {
IOUtils.closeQuietly(testFile);
NativeIO.POSIX.munmap(out);
try {
FsDatasetUtil.deleteMappedFile(testFilePath);
} catch (IOException e) {
LOG.warn("Failed to delete test file " + testFilePath +
" from persistent memory", e);
}
}
}
}
/**
* Choose a persistent memory volume based on a specific algorithm.
* Currently it is a round-robin policy.
*
* TODO: Refine volume selection policy by considering storage utilization.
*/
Byte getOneVolumeIndex() throws IOException {
if (count != 0) {
return (byte)(i++ % count);
} else {
throw new IOException("No usable persistent memory is found");
}
}
@VisibleForTesting
String getVolumeByIndex(Byte index) {
return pmemVolumes.get(index);
}
/**
* The cache file is named as BlockPoolId-BlockId.
* So its name can be inferred by BlockPoolId and BlockId.
*/
public String getCacheFileName(ExtendedBlockId key) {
return key.getBlockPoolId() + "-" + key.getBlockId();
}
/**
* Considering the pmem volume size is below TB level currently,
* it is tolerable to keep cache files under one directory.
* The strategy will be optimized, especially if one pmem volume
* has huge cache capacity.
*
* @param volumeIndex The index of pmem volume where a replica will be
* cached to or has been cached to.
*
* @param key The replica's ExtendedBlockId.
*
* @return A path to which the block replica is mapped.
*/
public String inferCacheFilePath(Byte volumeIndex, ExtendedBlockId key) {
return pmemVolumes.get(volumeIndex) + "/" + getCacheFileName(key);
}
/**
* The cache file path is pmemVolume/BlockPoolId-BlockId.
*/
public String getCacheFilePath(ExtendedBlockId key) {
Byte volumeIndex = blockKeyToVolume.get(key);
if (volumeIndex == null) {
return null;
}
return inferCacheFilePath(volumeIndex, key);
}
@VisibleForTesting
Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
return blockKeyToVolume;
}
/**
* Add cached block's ExtendedBlockId and its cache volume index to a map
* after cache.
*/
public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
blockKeyToVolume.put(key, volumeIndex);
}
/**
* Remove the record in blockKeyToVolume for uncached block after uncache.
*/
public void afterUncache(ExtendedBlockId key) {
blockKeyToVolume.remove(key);
}
}

View File

@ -2526,6 +2526,18 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.loader.class</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
<description>
Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader.
By default, MemoryMappableBlockLoader is used and it maps block replica into memory.
PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is
implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent
memory directory.
</description>
</property>
<property>
<name>dfs.datanode.max.locked.memory</name>
<value>0</value>
@ -2543,6 +2555,28 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.pmem.capacity</name>
<value>0</value>
<description>
The amount of persistent memory in bytes that can be used to cache block
replicas to persistent memory. Currently, persistent memory is only enabled
in HDFS Centralized Cache Management feature.
By default, this parameter is 0, which disables persistent memory caching.
</description>
</property>
<property>
<name>dfs.datanode.cache.pmem.dirs</name>
<value></value>
<description>
This value specifies the persistent memory directory used for caching block
replica. It matters only if the value of dfs.datanode.cache.loader.class is
PmemMappableBlockLoader. Multiple directories separated by "," are acceptable.
</description>
</property>
<property>
<name>dfs.namenode.list.cache.directives.num.responses</name>
<value>100</value>

View File

@ -0,0 +1,329 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
/**
* Tests HDFS persistent memory cache by PmemMappableBlockLoader.
*
* Bogus persistent memory volume is used to cache blocks.
*/
public class TestCacheByPmemMappableBlockLoader {
protected static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
protected static final long CACHE_CAPACITY = 64 * 1024;
protected static final long BLOCK_SIZE = 4 * 1024;
private static Configuration conf;
private static MiniDFSCluster cluster = null;
private static DistributedFileSystem fs;
private static DataNode dn;
private static FsDatasetCache cacheManager;
private static PmemMappableBlockLoader cacheLoader;
/**
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
* shared read lock. The test acquires the write lock for exclusive access.
*/
private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
private static CacheManipulator prevCacheManipulator;
private static DataNodeFaultInjector oldInjector;
private static final String PMEM_DIR_0 =
MiniDFSCluster.getBaseDirectory() + "pmem0";
private static final String PMEM_DIR_1 =
MiniDFSCluster.getBaseDirectory() + "pmem1";
static {
GenericTestUtils.setLogLevel(
LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
}
@BeforeClass
public static void setUpClass() throws Exception {
oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
@Override
public void startOfferService() throws Exception {
lock.readLock().lock();
}
@Override
public void endOfferService() throws Exception {
lock.readLock().unlock();
}
});
}
@AfterClass
public static void tearDownClass() throws Exception {
DataNodeFaultInjector.set(oldInjector);
}
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
CACHE_CAPACITY);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
// Configuration for pmem cache
conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
"org.apache.hadoop.hdfs.server.datanode." +
"fsdataset.impl.PmemMappableBlockLoader");
new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes
conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dn = cluster.getDataNodes().get(0);
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader();
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
}
protected static void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testPmemVolumeManager() throws IOException {
PmemVolumeManager pmemVolumeManager =
cacheLoader.getPmemVolumeManager();
assertNotNull(pmemVolumeManager);
assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
// Test round-robin selection policy
long count1 = 0, count2 = 0;
for (int i = 0; i < 10; i++) {
Byte index = pmemVolumeManager.getOneVolumeIndex();
String volume = pmemVolumeManager.getVolumeByIndex(index);
if (volume.equals(PMEM_DIR_0)) {
count1++;
} else if (volume.equals(PMEM_DIR_1)) {
count2++;
} else {
fail("Unexpected persistent storage location:" + volume);
}
}
assertEquals(count1, count2);
}
public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
throws IOException {
List<ExtendedBlockId> keys = new ArrayList<>();
HdfsBlockLocation[] locs =
(HdfsBlockLocation[]) fs.getFileBlockLocations(filePath, 0, fileLen);
for (HdfsBlockLocation loc : locs) {
long bkid = loc.getLocatedBlock().getBlock().getBlockId();
String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
keys.add(new ExtendedBlockId(bkid, bpid));
}
return keys;
}
@Test(timeout = 60000)
public void testCacheAndUncache() throws Exception {
final int maxCacheBlocksNum =
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
BlockReaderTestUtil.enableHdfsCachingTracing();
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity());
final Path testFile = new Path("/testFile");
final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
DFSTestUtil.createFile(fs, testFile,
testFileLen, (short) 1, 0xbeef);
List<ExtendedBlockId> blockKeys =
getExtendedBlockId(testFile, testFileLen);
fs.addCachePool(new CachePoolInfo("testPool"));
final long cacheDirectiveId = fs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool("testPool").
setPath(testFile).setReplication((short) 1).build());
// wait for caching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
if (blocksCached != maxCacheBlocksNum) {
LOG.info("waiting for " + maxCacheBlocksNum + " blocks to " +
"be cached. Right now " + blocksCached + " blocks are cached.");
return false;
}
LOG.info(maxCacheBlocksNum + " blocks are now cached.");
return true;
}
}, 1000, 30000);
// The pmem cache space is expected to have been used up.
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
Map<ExtendedBlockId, Byte> blockKeyToVolume =
cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
// Test each replica's cache file path
for (ExtendedBlockId key : blockKeys) {
String cachePath = cacheManager.
getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
// The cachePath shouldn't be null if the replica has been cached
// to pmem.
assertNotNull(cachePath);
String expectFileName =
cacheLoader.getPmemVolumeManager().getCacheFileName(key);
if (cachePath.startsWith(PMEM_DIR_0)) {
assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
}
}
// Try to cache another file. Caching this file should fail
// due to lack of available cache space.
final Path smallTestFile = new Path("/smallTestFile");
final long smallTestFileLen = BLOCK_SIZE;
DFSTestUtil.createFile(fs, smallTestFile,
smallTestFileLen, (short) 1, 0xbeef);
// Try to cache more blocks when no cache space is available.
final long smallFileCacheDirectiveId = fs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool("testPool").
setPath(smallTestFile).setReplication((short) 1).build());
// Wait for enough time to verify smallTestFile could not be cached.
Thread.sleep(10000);
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
// The cached block num should not be increased.
assertTrue(blocksCached == maxCacheBlocksNum);
// The blockKeyToVolume should just keep the block keys for the testFile.
assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
// Stop trying to cache smallTestFile to avoid interfering the
// verification for uncache functionality.
fs.removeCacheDirective(smallFileCacheDirectiveId);
// Uncache the test file
fs.removeCacheDirective(cacheDirectiveId);
// Wait for uncaching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksUncached =
MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
if (blocksUncached != maxCacheBlocksNum) {
LOG.info("waiting for " + maxCacheBlocksNum + " blocks to be " +
"uncached. Right now " + blocksUncached +
" blocks are uncached.");
return false;
}
LOG.info(maxCacheBlocksNum + " blocks have been uncached.");
return true;
}
}, 1000, 30000);
// It is expected that no pmem cache space is used.
assertEquals(0, cacheManager.getPmemCacheUsed());
// No record should be kept by blockKeyToVolume after testFile is uncached.
assertEquals(blockKeyToVolume.size(), 0);
}
}