HDFS-14458. Report pmem stats to namenode. Contributed by Feilong He.

This commit is contained in:
Rakesh Radhakrishnan 2019-07-15 13:02:37 +05:30
parent 0976f6fc30
commit e98adb00b7
9 changed files with 45 additions and 60 deletions

View File

@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* Keeps statistics for the memory cache. * Keeps statistics for the memory cache.
*/ */
class MemoryCacheStats { class CacheStats {
/** /**
* The approximate amount of cache space in use. * The approximate amount of cache space in use.
@ -47,7 +47,7 @@ class MemoryCacheStats {
*/ */
private final long maxBytes; private final long maxBytes;
MemoryCacheStats(long maxBytes) { CacheStats(long maxBytes) {
this.usedBytesCount = new UsedBytesCount(); this.usedBytesCount = new UsedBytesCount();
this.maxBytes = maxBytes; this.maxBytes = maxBytes;
} }
@ -81,7 +81,7 @@ class MemoryCacheStats {
private class UsedBytesCount { private class UsedBytesCount {
private final AtomicLong usedBytes = new AtomicLong(0); private final AtomicLong usedBytes = new AtomicLong(0);
private MemoryCacheStats.PageRounder rounder = new PageRounder(); private CacheStats.PageRounder rounder = new PageRounder();
/** /**
* Try to reserve more bytes. * Try to reserve more bytes.

View File

@ -23,7 +23,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -137,7 +136,7 @@ public class FsDatasetCache {
*/ */
private final MappableBlockLoader cacheLoader; private final MappableBlockLoader cacheLoader;
private final MemoryCacheStats memCacheStats; private final CacheStats memCacheStats;
/** /**
* Number of cache commands that could not be completed successfully * Number of cache commands that could not be completed successfully
@ -178,30 +177,17 @@ public class FsDatasetCache {
". Reconfigure this to " + minRevocationPollingMs); ". Reconfigure this to " + minRevocationPollingMs);
} }
this.revocationPollingMs = confRevocationPollingMs; this.revocationPollingMs = confRevocationPollingMs;
// Both lazy writer and read cache are sharing this statistics.
this.memCacheStats = new MemoryCacheStats(
dataset.datanode.getDnConf().getMaxLockedMemory());
this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader( this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
this.getDnConf()); this.getDnConf());
cacheLoader.initialize(this); // Both lazy writer and read cache are sharing this statistics.
} this.memCacheStats = cacheLoader.initialize(this.getDnConf());
/**
* Check if pmem cache is enabled.
*/
private boolean isPmemCacheEnabled() {
return !cacheLoader.isTransientCache();
} }
DNConf getDnConf() { DNConf getDnConf() {
return this.dataset.datanode.getDnConf(); return this.dataset.datanode.getDnConf();
} }
MemoryCacheStats getMemCacheStats() {
return memCacheStats;
}
/** /**
* Get the cache path if the replica is cached into persistent memory. * Get the cache path if the replica is cached into persistent memory.
*/ */
@ -557,37 +543,32 @@ public class FsDatasetCache {
/** /**
* Get the approximate amount of DRAM cache space used. * Get the approximate amount of DRAM cache space used.
*/ */
public long getCacheUsed() { public long getMemCacheUsed() {
return memCacheStats.getCacheUsed(); return memCacheStats.getCacheUsed();
} }
/** /**
* Get the approximate amount of persistent memory cache space used. * Get the approximate amount of cache space used either on DRAM or
* TODO: advertise this metric to NameNode by FSDatasetMBean * on persistent memory.
* @return
*/ */
public long getPmemCacheUsed() { public long getCacheUsed() {
if (isPmemCacheEnabled()) { return cacheLoader.getCacheUsed();
return cacheLoader.getCacheUsed();
}
return 0;
} }
/** /**
* Get the maximum amount of bytes we can cache on DRAM. This is a constant. * Get the maximum amount of bytes we can cache on DRAM. This is a constant.
*/ */
public long getCacheCapacity() { public long getMemCacheCapacity() {
return memCacheStats.getCacheCapacity(); return memCacheStats.getCacheCapacity();
} }
/** /**
* Get cache capacity of persistent memory. * Get the maximum amount of bytes we can cache either on DRAM or
* TODO: advertise this metric to NameNode by FSDatasetMBean * on persistent memory. This is a constant.
*/ */
public long getPmemCacheCapacity() { public long getCacheCapacity() {
if (isPmemCacheEnabled()) { return cacheLoader.getCacheCapacity();
return cacheLoader.getCacheCapacity();
}
return 0;
} }
public long getNumBlocksFailedToCache() { public long getNumBlocksFailedToCache() {
@ -608,11 +589,6 @@ public class FsDatasetCache {
return (val != null) && val.state.shouldAdvertise(); return (val != null) && val.state.shouldAdvertise();
} }
@VisibleForTesting
MappableBlockLoader getCacheLoader() {
return cacheLoader;
}
/** /**
* This method can be executed during DataNode shutdown. * This method can be executed during DataNode shutdown.
*/ */

View File

@ -3164,10 +3164,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public void evictBlocks(long bytesNeeded) throws IOException { public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0; int iterations = 0;
final long cacheCapacity = cacheManager.getCacheCapacity(); final long cacheCapacity = cacheManager.getMemCacheCapacity();
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
(cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) { (cacheCapacity - cacheManager.getMemCacheUsed()) < bytesNeeded) {
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction(); RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) { if (replicaState == null) {

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -43,7 +44,7 @@ public abstract class MappableBlockLoader {
/** /**
* Initialize a specific MappableBlockLoader. * Initialize a specific MappableBlockLoader.
*/ */
abstract void initialize(FsDatasetCache cacheManager) throws IOException; abstract CacheStats initialize(DNConf dnConf) throws IOException;
/** /**
* Load the block. * Load the block.

View File

@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -39,12 +40,13 @@ import java.nio.channels.FileChannel;
public class MemoryMappableBlockLoader extends MappableBlockLoader { public class MemoryMappableBlockLoader extends MappableBlockLoader {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(MemoryMappableBlockLoader.class); LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
private MemoryCacheStats memCacheStats; private CacheStats memCacheStats;
@Override @Override
void initialize(FsDatasetCache cacheManager) throws IOException { CacheStats initialize(DNConf dnConf) throws IOException {
LOG.info("Initializing cache loader: MemoryMappableBlockLoader."); LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
this.memCacheStats = cacheManager.getMemCacheStats(); this.memCacheStats = new CacheStats(dnConf.getMaxLockedMemory());
return memCacheStats;
} }
/** /**

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX; import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -47,8 +48,8 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
LoggerFactory.getLogger(NativePmemMappableBlockLoader.class); LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
@Override @Override
void initialize(FsDatasetCache cacheManager) throws IOException { CacheStats initialize(DNConf dnConf) throws IOException {
super.initialize(cacheManager); return super.initialize(dnConf);
} }
/** /**

View File

@ -42,11 +42,16 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
private PmemVolumeManager pmemVolumeManager; private PmemVolumeManager pmemVolumeManager;
@Override @Override
void initialize(FsDatasetCache cacheManager) throws IOException { CacheStats initialize(DNConf dnConf) throws IOException {
LOG.info("Initializing cache loader: " + this.getClass().getName()); LOG.info("Initializing cache loader: " + this.getClass().getName());
DNConf dnConf = cacheManager.getDnConf();
PmemVolumeManager.init(dnConf.getPmemVolumes()); PmemVolumeManager.init(dnConf.getPmemVolumes());
pmemVolumeManager = PmemVolumeManager.getInstance(); pmemVolumeManager = PmemVolumeManager.getInstance();
// The configuration for max locked memory is shaded.
LOG.info("Persistent memory is used for caching data instead of " +
"DRAM. Max locked memory is set to zero to disable DRAM cache");
// TODO: PMem is not supporting Lazy Writer now, will refine this stats
// while implementing it.
return new CacheStats(0L);
} }
/** /**

View File

@ -84,7 +84,6 @@ public class TestCacheByPmemMappableBlockLoader {
private static DistributedFileSystem fs; private static DistributedFileSystem fs;
private static DataNode dn; private static DataNode dn;
private static FsDatasetCache cacheManager; private static FsDatasetCache cacheManager;
private static PmemMappableBlockLoader cacheLoader;
/** /**
* Used to pause DN BPServiceActor threads. BPSA threads acquire the * Used to pause DN BPServiceActor threads. BPSA threads acquire the
* shared read lock. The test acquires the write lock for exclusive access. * shared read lock. The test acquires the write lock for exclusive access.
@ -131,8 +130,6 @@ public class TestCacheByPmemMappableBlockLoader {
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100); DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500); conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
CACHE_CAPACITY);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10); conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
@ -153,7 +150,6 @@ public class TestCacheByPmemMappableBlockLoader {
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
dn = cluster.getDataNodes().get(0); dn = cluster.getDataNodes().get(0);
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager; cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader();
} }
@After @After
@ -216,7 +212,9 @@ public class TestCacheByPmemMappableBlockLoader {
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE); Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
BlockReaderTestUtil.enableHdfsCachingTracing(); BlockReaderTestUtil.enableHdfsCachingTracing();
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE); Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity()); assertEquals(CACHE_CAPACITY, cacheManager.getCacheCapacity());
// DRAM cache is expected to be disabled.
assertEquals(0L, cacheManager.getMemCacheCapacity());
final Path testFile = new Path("/testFile"); final Path testFile = new Path("/testFile");
final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE; final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
@ -246,7 +244,9 @@ public class TestCacheByPmemMappableBlockLoader {
}, 1000, 30000); }, 1000, 30000);
// The pmem cache space is expected to have been used up. // The pmem cache space is expected to have been used up.
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed()); assertEquals(CACHE_CAPACITY, cacheManager.getCacheUsed());
// There should be no cache used on DRAM.
assertEquals(0L, cacheManager.getMemCacheUsed());
Map<ExtendedBlockId, Byte> blockKeyToVolume = Map<ExtendedBlockId, Byte> blockKeyToVolume =
PmemVolumeManager.getInstance().getBlockKeyToVolume(); PmemVolumeManager.getInstance().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume // All block keys should be kept in blockKeyToVolume
@ -318,7 +318,7 @@ public class TestCacheByPmemMappableBlockLoader {
}, 1000, 30000); }, 1000, 30000);
// It is expected that no pmem cache space is used. // It is expected that no pmem cache space is used.
assertEquals(0, cacheManager.getPmemCacheUsed()); assertEquals(0, cacheManager.getCacheUsed());
// No record should be kept by blockKeyToVolume after testFile is uncached. // No record should be kept by blockKeyToVolume after testFile is uncached.
assertEquals(blockKeyToVolume.size(), 0); assertEquals(blockKeyToVolume.size(), 0);
} }

View File

@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.CacheStats.PageRounder;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;