HDFS-14401. Refine the implementation for HDFS cache on SCM. Contributed by Feilong He.

This commit is contained in:
Rakesh Radhakrishnan 2019-05-08 17:20:21 +05:30
parent b00f9d89e4
commit 3b2a7aabf7
13 changed files with 256 additions and 202 deletions

View File

@ -25,8 +25,6 @@ import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@ -382,22 +380,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
// Currently, the available cache loaders are MemoryMappableBlockLoader,
// PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
// loader to cache block replica to memory.
public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
"dfs.datanode.cache.loader.class";
public static final Class<? extends MappableBlockLoader>
DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
MemoryMappableBlockLoader.class;
// Multiple dirs separated by "," are acceptable.
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
"dfs.datanode.cache.pmem.dirs";
public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
// The cache capacity of persistent memory
public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
"dfs.datanode.cache.pmem.capacity";
public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -27,10 +27,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
@ -71,7 +67,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
import org.apache.hadoop.security.SaslPropertiesResolver;
import java.util.concurrent.TimeUnit;
@ -121,9 +116,7 @@ public class DNConf {
final long xceiverStopTimeout;
final long restartReplicaExpiry;
private final Class<? extends MappableBlockLoader> cacheLoaderClass;
final long maxLockedMemory;
private final long maxLockedPmem;
private final String[] pmemDirs;
private final long bpReadyTimeout;
@ -266,17 +259,10 @@ public class DNConf {
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
this.maxLockedMemory = getConf().getLong(
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
this.maxLockedPmem = getConf().getLong(
DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
this.pmemDirs = getConf().getTrimmedStrings(
DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
@ -342,10 +328,6 @@ public class DNConf {
return maxLockedMemory;
}
public long getMaxLockedPmem() {
return maxLockedPmem;
}
/**
* Returns true if connect to datanode via hostname
*
@ -449,10 +431,6 @@ public class DNConf {
return maxDataLength;
}
public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
return cacheLoaderClass;
}
public String[] getPmemVolumes() {
return pmemDirs;
}

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -183,9 +182,8 @@ public class FsDatasetCache {
this.memCacheStats = new MemoryCacheStats(
dataset.datanode.getDnConf().getMaxLockedMemory());
Class<? extends MappableBlockLoader> cacheLoaderClass =
dataset.datanode.getDnConf().getCacheLoaderClass();
this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
this.getDnConf());
cacheLoader.initialize(this);
}
@ -213,7 +211,7 @@ public class FsDatasetCache {
return null;
}
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
return cacheLoader.getCachedPath(key);
return PmemVolumeManager.getInstance().getCachePath(key);
}
/**
@ -380,14 +378,13 @@ public class FsDatasetCache {
MappableBlock mappableBlock = null;
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
long newUsedBytes = cacheLoader.reserve(length);
long newUsedBytes = cacheLoader.reserve(key, length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
" more bytes in the cache: " +
cacheLoader.getCacheCapacityConfigKey() +
" of " + cacheLoader.getCacheCapacity() + " exceeded.");
LOG.warn("Failed to cache " + key + ": could not reserve " +
"more bytes in the cache: " + cacheLoader.getCacheCapacity() +
" exceeded when try to reserve " + length + "bytes.");
return;
}
reservedBytes = true;
@ -442,10 +439,10 @@ public class FsDatasetCache {
IOUtils.closeQuietly(metaIn);
if (!success) {
if (reservedBytes) {
cacheLoader.release(length);
cacheLoader.release(key, length);
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, memCacheStats.getCacheUsed());
+ "bytes in total.", key, cacheLoader.getCacheUsed());
IOUtils.closeQuietly(mappableBlock);
numBlocksFailedToCache.incrementAndGet();
@ -519,7 +516,8 @@ public class FsDatasetCache {
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
long newUsedBytes = cacheLoader.
release(key, value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) {
@ -592,4 +590,11 @@ public class FsDatasetCache {
MappableBlockLoader getCacheLoader() {
return cacheLoader;
}
/**
* This method can be executed during DataNode shutdown.
*/
void shutdown() {
cacheLoader.shutdown();
}
}

View File

@ -2340,6 +2340,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
"from LazyWriter.join");
}
}
cacheManager.shutdown();
}
@Override // FSDatasetMBean

View File

@ -65,26 +65,25 @@ public abstract class MappableBlockLoader {
/**
* Try to reserve some given bytes.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
abstract long reserve(long bytesCount);
abstract long reserve(ExtendedBlockId key, long bytesCount);
/**
* Release some bytes that we're using.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to release.
*
* @return The new number of usedBytes.
*/
abstract long release(long bytesCount);
/**
* Get the config key of cache capacity.
*/
abstract String getCacheCapacityConfigKey();
abstract long release(ExtendedBlockId key, long bytesCount);
/**
* Get the approximate amount of cache space used.
@ -102,9 +101,11 @@ public abstract class MappableBlockLoader {
abstract boolean isTransientCache();
/**
* Get a cache file path if applicable. Otherwise return null.
* Clean up cache, can be used during DataNode shutdown.
*/
abstract String getCachedPath(ExtendedBlockId key);
void shutdown() {
// Do nothing.
}
/**
* Reads bytes into a buffer until EOF or the buffer's limit is reached.

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
/**
* Creates MappableBlockLoader.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class MappableBlockLoaderFactory {
private MappableBlockLoaderFactory() {
// Prevent instantiation
}
/**
* Create a specific cache loader according to the configuration.
* If persistent memory volume is not configured, return a cache loader
* for DRAM cache. Otherwise, return a cache loader for pmem cache.
*/
public static MappableBlockLoader createCacheLoader(DNConf conf) {
if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
return new MemoryMappableBlockLoader();
}
return new PmemMappableBlockLoader();
}
}

View File

@ -22,11 +22,12 @@ import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
@ -42,11 +43,13 @@ import java.nio.channels.FileChannel;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MemoryMappableBlockLoader extends MappableBlockLoader {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
private MemoryCacheStats memCacheStats;
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
this.memCacheStats = cacheManager.getMemCacheStats();
}
@ -148,11 +151,6 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
}
}
@Override
public String getCacheCapacityConfigKey() {
return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
}
@Override
public long getCacheUsed() {
return memCacheStats.getCacheUsed();
@ -164,12 +162,12 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
}
@Override
long reserve(long bytesCount) {
long reserve(ExtendedBlockId key, long bytesCount) {
return memCacheStats.reserve(bytesCount);
}
@Override
long release(long bytesCount) {
long release(ExtendedBlockId key, long bytesCount) {
return memCacheStats.release(bytesCount);
}
@ -177,9 +175,4 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
public boolean isTransientCache() {
return true;
}
@Override
public String getCachedPath(ExtendedBlockId key) {
return null;
}
}

View File

@ -18,12 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
@ -53,14 +51,10 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
DNConf dnConf = cacheManager.getDnConf();
this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
dnConf.getPmemVolumes());
}
@VisibleForTesting
PmemVolumeManager getPmemVolumeManager() {
return pmemVolumeManager;
PmemVolumeManager.init(dnConf.getPmemVolumes());
pmemVolumeManager = PmemVolumeManager.getInstance();
}
/**
@ -69,7 +63,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
* Map the block and verify its checksum.
*
* The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
* is a persistent memory volume selected by getOneLocation() method.
* is a persistent memory volume chosen by PmemVolumeManager.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
@ -100,8 +94,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
throw new IOException("Block InputStream has no FileChannel.");
}
Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
filePath = pmemVolumeManager.getCachePath(key);
file = new RandomAccessFile(filePath, "rw");
out = file.getChannel().
map(FileChannel.MapMode.READ_WRITE, 0, length);
@ -111,9 +104,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
}
verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
blockFileName);
mappableBlock = new PmemMappedBlock(
length, volumeIndex, key, pmemVolumeManager);
pmemVolumeManager.afterCache(key, volumeIndex);
mappableBlock = new PmemMappedBlock(length, key);
LOG.info("Successfully cached one replica:{} into persistent memory"
+ ", [cached path={}, length={}]", key, filePath, length);
} finally {
@ -123,6 +114,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
}
IOUtils.closeQuietly(file);
if (mappableBlock == null) {
LOG.debug("Delete {} due to unsuccessful mapping.", filePath);
FsDatasetUtil.deleteMappedFile(filePath);
}
}
@ -193,11 +185,6 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
}
}
@Override
public String getCacheCapacityConfigKey() {
return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
}
@Override
public long getCacheUsed() {
return pmemVolumeManager.getCacheUsed();
@ -209,13 +196,13 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
}
@Override
long reserve(long bytesCount) {
return pmemVolumeManager.reserve(bytesCount);
long reserve(ExtendedBlockId key, long bytesCount) {
return pmemVolumeManager.reserve(key, bytesCount);
}
@Override
long release(long bytesCount) {
return pmemVolumeManager.release(bytesCount);
long release(ExtendedBlockId key, long bytesCount) {
return pmemVolumeManager.release(key, bytesCount);
}
@Override
@ -224,7 +211,8 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
}
@Override
public String getCachedPath(ExtendedBlockId key) {
return pmemVolumeManager.getCacheFilePath(key);
void shutdown() {
LOG.info("Clean up cache on persistent memory during shutdown.");
PmemVolumeManager.getInstance().cleanup();
}
}

View File

@ -35,18 +35,13 @@ import java.io.IOException;
public class PmemMappedBlock implements MappableBlock {
private static final Logger LOG =
LoggerFactory.getLogger(PmemMappedBlock.class);
private final PmemVolumeManager pmemVolumeManager;
private long length;
private Byte volumeIndex = null;
private ExtendedBlockId key;
PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
PmemVolumeManager pmemVolumeManager) {
PmemMappedBlock(long length, ExtendedBlockId key) {
assert length > 0;
this.length = length;
this.volumeIndex = volumeIndex;
this.key = key;
this.pmemVolumeManager = pmemVolumeManager;
}
@Override
@ -57,10 +52,9 @@ public class PmemMappedBlock implements MappableBlock {
@Override
public void close() {
String cacheFilePath =
pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
PmemVolumeManager.getInstance().getCachePath(key);
try {
FsDatasetUtil.deleteMappedFile(cacheFilePath);
pmemVolumeManager.afterUncache(key);
LOG.info("Successfully uncached one replica:{} from persistent memory"
+ ", [cached path={}, length={}]", key, cacheFilePath, length);
} catch (IOException e) {

View File

@ -35,6 +35,7 @@ import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -45,14 +46,19 @@ import java.util.concurrent.atomic.AtomicLong;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class PmemVolumeManager {
public final class PmemVolumeManager {
/**
* Counts used bytes for persistent memory.
*/
private class UsedBytesCount {
private static class UsedBytesCount {
private final long maxBytes;
private final AtomicLong usedBytes = new AtomicLong(0);
UsedBytesCount(long maxBytes) {
this.maxBytes = maxBytes;
}
/**
* Try to reserve more bytes.
*
@ -65,7 +71,7 @@ public class PmemVolumeManager {
while (true) {
long cur = usedBytes.get();
long next = cur + bytesCount;
if (next > cacheCapacity) {
if (next > maxBytes) {
return -1;
}
if (usedBytes.compareAndSet(cur, next)) {
@ -85,42 +91,76 @@ public class PmemVolumeManager {
return usedBytes.addAndGet(-bytesCount);
}
long get() {
long getUsedBytes() {
return usedBytes.get();
}
long getMaxBytes() {
return maxBytes;
}
long getAvailableBytes() {
return maxBytes - usedBytes.get();
}
}
private static final Logger LOG =
LoggerFactory.getLogger(PmemVolumeManager.class);
public static final String CACHE_DIR = "hdfs_pmem_cache";
private static PmemVolumeManager pmemVolumeManager = null;
private final ArrayList<String> pmemVolumes = new ArrayList<>();
// Maintain which pmem volume a block is cached to.
private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
new ConcurrentHashMap<>();
private final UsedBytesCount usedBytesCount;
private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>();
/**
* The total cache capacity in bytes of persistent memory.
* It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
*/
private final long cacheCapacity;
private long cacheCapacity;
private static long maxBytesPerPmem = -1;
private int count = 0;
// Strict atomic operation is not guaranteed for the performance sake.
private int i = 0;
private byte nextIndex = 0;
PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
throws IOException {
if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException {
if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) {
throw new IOException("The persistent memory volume, " +
DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
" is not configured!");
}
this.loadVolumes(pmemVolumesConfigured);
this.usedBytesCount = new UsedBytesCount();
this.cacheCapacity = maxBytes;
this.loadVolumes(pmemVolumesConfig);
cacheCapacity = 0L;
for (UsedBytesCount counter : usedBytesCounts) {
cacheCapacity += counter.getMaxBytes();
}
}
public synchronized static void init(String[] pmemVolumesConfig)
throws IOException {
if (pmemVolumeManager == null) {
pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig);
}
}
public static PmemVolumeManager getInstance() {
if (pmemVolumeManager == null) {
throw new RuntimeException(
"The pmemVolumeManager should be instantiated!");
}
return pmemVolumeManager;
}
@VisibleForTesting
public static void setMaxBytes(long maxBytes) {
maxBytesPerPmem = maxBytes;
}
public long getCacheUsed() {
return usedBytesCount.get();
long usedBytes = 0L;
for (UsedBytesCount counter : usedBytesCounts) {
usedBytes += counter.getUsedBytes();
}
return usedBytes;
}
public long getCacheCapacity() {
@ -130,24 +170,40 @@ public class PmemVolumeManager {
/**
* Try to reserve more bytes on persistent memory.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to add.
*
* @return The new number of usedBytes if we succeeded;
* -1 if we failed.
*/
long reserve(long bytesCount) {
return usedBytesCount.reserve(bytesCount);
synchronized long reserve(ExtendedBlockId key, long bytesCount) {
try {
byte index = chooseVolume(bytesCount);
long usedBytes = usedBytesCounts.get(index).reserve(bytesCount);
// Put the entry into blockKeyToVolume if reserving bytes succeeded.
if (usedBytes > 0) {
blockKeyToVolume.put(key, index);
}
return usedBytes;
} catch (IOException e) {
LOG.warn(e.getMessage());
return -1L;
}
}
/**
* Release some bytes that we're using on persistent memory.
*
* @param key The ExtendedBlockId for a block.
*
* @param bytesCount The number of bytes to release.
*
* @return The new number of usedBytes.
*/
long release(long bytesCount) {
return usedBytesCount.release(bytesCount);
long release(ExtendedBlockId key, long bytesCount) {
Byte index = blockKeyToVolume.remove(key);
return usedBytesCounts.get(index).release(bytesCount);
}
/**
@ -155,46 +211,70 @@ public class PmemVolumeManager {
*
* @throws IOException If there is no available pmem volume.
*/
private void loadVolumes(String[] volumes) throws IOException {
private void loadVolumes(String[] volumes)
throws IOException {
// Check whether the volume exists
for (String volume: volumes) {
for (byte n = 0; n < volumes.length; n++) {
try {
File pmemDir = new File(volume);
verifyIfValidPmemVolume(pmemDir);
// Remove all files under the volume.
FileUtils.cleanDirectory(pmemDir);
File pmemDir = new File(volumes[n]);
File realPmemDir = verifyIfValidPmemVolume(pmemDir);
this.pmemVolumes.add(realPmemDir.getPath());
long maxBytes;
if (maxBytesPerPmem == -1) {
maxBytes = realPmemDir.getUsableSpace();
} else {
maxBytes = maxBytesPerPmem;
}
UsedBytesCount usedBytesCount = new UsedBytesCount(maxBytes);
this.usedBytesCounts.add(usedBytesCount);
LOG.info("Added persistent memory - {} with size={}",
volumes[n], maxBytes);
} catch (IllegalArgumentException e) {
LOG.error("Failed to parse persistent memory volume " + volume, e);
LOG.error("Failed to parse persistent memory volume " + volumes[n], e);
continue;
} catch (IOException e) {
LOG.error("Bad persistent memory volume: " + volume, e);
LOG.error("Bad persistent memory volume: " + volumes[n], e);
continue;
}
pmemVolumes.add(volume);
LOG.info("Added persistent memory - " + volume);
}
count = pmemVolumes.size();
if (count == 0) {
throw new IOException(
"At least one valid persistent memory volume is required!");
}
cleanup();
}
void cleanup() {
// Remove all files under the volume.
for (String pmemDir: pmemVolumes) {
try {
FileUtils.cleanDirectory(new File(pmemDir));
} catch (IOException e) {
LOG.error("Failed to clean up " + pmemDir, e);
}
}
}
@VisibleForTesting
static void verifyIfValidPmemVolume(File pmemDir)
static File verifyIfValidPmemVolume(File pmemDir)
throws IOException {
if (!pmemDir.exists()) {
final String message = pmemDir + " does not exist";
throw new IOException(message);
}
if (!pmemDir.isDirectory()) {
final String message = pmemDir + " is not a directory";
throw new IllegalArgumentException(message);
}
File realPmemDir = new File(getRealPmemDir(pmemDir.getPath()));
if (!realPmemDir.exists() && !realPmemDir.mkdir()) {
throw new IOException("Failed to create " + realPmemDir.getPath());
}
String uuidStr = UUID.randomUUID().toString();
String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
String testFilePath = realPmemDir.getPath() + "/.verify.pmem." + uuidStr;
byte[] contents = uuidStr.getBytes("UTF-8");
RandomAccessFile testFile = null;
MappedByteBuffer out = null;
@ -203,15 +283,17 @@ public class PmemVolumeManager {
out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
contents.length);
if (out == null) {
throw new IOException("Failed to map the test file under " + pmemDir);
throw new IOException(
"Failed to map the test file under " + realPmemDir);
}
out.put(contents);
// Forces to write data to storage device containing the mapped file
out.force();
return realPmemDir;
} catch (IOException e) {
throw new IOException(
"Exception while writing data to persistent storage dir: " +
pmemDir, e);
realPmemDir, e);
} finally {
if (out != null) {
out.clear();
@ -229,18 +311,38 @@ public class PmemVolumeManager {
}
}
public static String getRealPmemDir(String rawPmemDir) {
return new File(rawPmemDir, CACHE_DIR).getAbsolutePath();
}
/**
* Choose a persistent memory volume based on a specific algorithm.
* Currently it is a round-robin policy.
*
* TODO: Refine volume selection policy by considering storage utilization.
*/
Byte getOneVolumeIndex() throws IOException {
if (count != 0) {
return (byte)(i++ % count);
} else {
synchronized Byte chooseVolume(long bytesCount) throws IOException {
if (count == 0) {
throw new IOException("No usable persistent memory is found");
}
int k = 0;
long maxAvailableSpace = 0L;
while (k++ != count) {
if (nextIndex == count) {
nextIndex = 0;
}
byte index = nextIndex++;
long availableBytes = usedBytesCounts.get(index).getAvailableBytes();
if (availableBytes >= bytesCount) {
return index;
}
if (availableBytes > maxAvailableSpace) {
maxAvailableSpace = availableBytes;
}
}
throw new IOException("There is no enough persistent memory space " +
"for caching. The current max available space is " +
maxAvailableSpace + ", but " + bytesCount + "is required.");
}
@VisibleForTesting
@ -276,7 +378,7 @@ public class PmemVolumeManager {
/**
* The cache file path is pmemVolume/BlockPoolId-BlockId.
*/
public String getCacheFilePath(ExtendedBlockId key) {
public String getCachePath(ExtendedBlockId key) {
Byte volumeIndex = blockKeyToVolume.get(key);
if (volumeIndex == null) {
return null;
@ -288,19 +390,4 @@ public class PmemVolumeManager {
Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
return blockKeyToVolume;
}
/**
* Add cached block's ExtendedBlockId and its cache volume index to a map
* after cache.
*/
public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
blockKeyToVolume.put(key, volumeIndex);
}
/**
* Remove the record in blockKeyToVolume for uncached block after uncache.
*/
public void afterUncache(ExtendedBlockId key) {
blockKeyToVolume.remove(key);
}
}

View File

@ -2510,18 +2510,6 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.loader.class</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
<description>
Currently, the available cache loaders are MemoryMappableBlockLoader, PmemMappableBlockLoader.
By default, MemoryMappableBlockLoader is used and it maps block replica into memory.
PmemMappableBlockLoader can map block to persistent memory with mapped byte buffer, which is
implemented by Java code. The value of dfs.datanode.cache.pmem.dirs specifies the persistent
memory directory.
</description>
</property>
<property>
<name>dfs.datanode.max.locked.memory</name>
<value>0</value>
@ -2538,18 +2526,6 @@
</description>
</property>
<property>
<name>dfs.datanode.cache.pmem.capacity</name>
<value>0</value>
<description>
The amount of persistent memory in bytes that can be used to cache block
replicas to persistent memory. Currently, persistent memory is only enabled
in HDFS Centralized Cache Management feature.
By default, this parameter is 0, which disables persistent memory caching.
</description>
</property>
<property>
<name>dfs.datanode.cache.pmem.dirs</name>
<value></value>

View File

@ -21,8 +21,6 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
@ -139,14 +137,11 @@ public class TestCacheByPmemMappableBlockLoader {
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
// Configuration for pmem cache
conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
"org.apache.hadoop.hdfs.server.datanode." +
"fsdataset.impl.PmemMappableBlockLoader");
new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes
conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
@ -183,18 +178,17 @@ public class TestCacheByPmemMappableBlockLoader {
@Test
public void testPmemVolumeManager() throws IOException {
PmemVolumeManager pmemVolumeManager =
cacheLoader.getPmemVolumeManager();
PmemVolumeManager pmemVolumeManager = PmemVolumeManager.getInstance();
assertNotNull(pmemVolumeManager);
assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
// Test round-robin selection policy
long count1 = 0, count2 = 0;
for (int i = 0; i < 10; i++) {
Byte index = pmemVolumeManager.getOneVolumeIndex();
Byte index = pmemVolumeManager.chooseVolume(BLOCK_SIZE);
String volume = pmemVolumeManager.getVolumeByIndex(index);
if (volume.equals(PMEM_DIR_0)) {
if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_0))) {
count1++;
} else if (volume.equals(PMEM_DIR_1)) {
} else if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_1))) {
count2++;
} else {
fail("Unexpected persistent storage location:" + volume);
@ -254,7 +248,7 @@ public class TestCacheByPmemMappableBlockLoader {
// The pmem cache space is expected to have been used up.
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
Map<ExtendedBlockId, Byte> blockKeyToVolume =
cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
PmemVolumeManager.getInstance().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
@ -266,11 +260,13 @@ public class TestCacheByPmemMappableBlockLoader {
// to pmem.
assertNotNull(cachePath);
String expectFileName =
cacheLoader.getPmemVolumeManager().getCacheFileName(key);
PmemVolumeManager.getInstance().getCacheFileName(key);
if (cachePath.startsWith(PMEM_DIR_0)) {
assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
assertTrue(cachePath.equals(PmemVolumeManager
.getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
assertTrue(cachePath.equals(PmemVolumeManager
.getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
}

View File

@ -401,9 +401,10 @@ public class TestFsDatasetCache {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
// check the log reported by FsDataSetCache
// in the case that cache capacity is exceeded.
int lines = appender.countLinesWithMessage(
"more bytes in the cache: " +
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
"could not reserve more bytes in the cache: ");
return lines > 0;
}
}, 500, 30000);