HDFS-14458. Report pmem stats to namenode. Contributed by Feilong He.
(cherry picked from commit e98adb00b7
)
This commit is contained in:
parent
9281b72550
commit
df0dcc7493
|
@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
/**
|
/**
|
||||||
* Keeps statistics for the memory cache.
|
* Keeps statistics for the memory cache.
|
||||||
*/
|
*/
|
||||||
class MemoryCacheStats {
|
class CacheStats {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The approximate amount of cache space in use.
|
* The approximate amount of cache space in use.
|
||||||
|
@ -47,7 +47,7 @@ class MemoryCacheStats {
|
||||||
*/
|
*/
|
||||||
private final long maxBytes;
|
private final long maxBytes;
|
||||||
|
|
||||||
MemoryCacheStats(long maxBytes) {
|
CacheStats(long maxBytes) {
|
||||||
this.usedBytesCount = new UsedBytesCount();
|
this.usedBytesCount = new UsedBytesCount();
|
||||||
this.maxBytes = maxBytes;
|
this.maxBytes = maxBytes;
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ class MemoryCacheStats {
|
||||||
private class UsedBytesCount {
|
private class UsedBytesCount {
|
||||||
private final AtomicLong usedBytes = new AtomicLong(0);
|
private final AtomicLong usedBytes = new AtomicLong(0);
|
||||||
|
|
||||||
private MemoryCacheStats.PageRounder rounder = new PageRounder();
|
private CacheStats.PageRounder rounder = new PageRounder();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to reserve more bytes.
|
* Try to reserve more bytes.
|
|
@ -23,7 +23,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
@ -137,7 +136,7 @@ public class FsDatasetCache {
|
||||||
*/
|
*/
|
||||||
private final MappableBlockLoader cacheLoader;
|
private final MappableBlockLoader cacheLoader;
|
||||||
|
|
||||||
private final MemoryCacheStats memCacheStats;
|
private final CacheStats memCacheStats;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of cache commands that could not be completed successfully
|
* Number of cache commands that could not be completed successfully
|
||||||
|
@ -178,30 +177,17 @@ public class FsDatasetCache {
|
||||||
". Reconfigure this to " + minRevocationPollingMs);
|
". Reconfigure this to " + minRevocationPollingMs);
|
||||||
}
|
}
|
||||||
this.revocationPollingMs = confRevocationPollingMs;
|
this.revocationPollingMs = confRevocationPollingMs;
|
||||||
// Both lazy writer and read cache are sharing this statistics.
|
|
||||||
this.memCacheStats = new MemoryCacheStats(
|
|
||||||
dataset.datanode.getDnConf().getMaxLockedMemory());
|
|
||||||
|
|
||||||
this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
|
this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
|
||||||
this.getDnConf());
|
this.getDnConf());
|
||||||
cacheLoader.initialize(this);
|
// Both lazy writer and read cache are sharing this statistics.
|
||||||
}
|
this.memCacheStats = cacheLoader.initialize(this.getDnConf());
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if pmem cache is enabled.
|
|
||||||
*/
|
|
||||||
private boolean isPmemCacheEnabled() {
|
|
||||||
return !cacheLoader.isTransientCache();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DNConf getDnConf() {
|
DNConf getDnConf() {
|
||||||
return this.dataset.datanode.getDnConf();
|
return this.dataset.datanode.getDnConf();
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryCacheStats getMemCacheStats() {
|
|
||||||
return memCacheStats;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the cache path if the replica is cached into persistent memory.
|
* Get the cache path if the replica is cached into persistent memory.
|
||||||
*/
|
*/
|
||||||
|
@ -557,37 +543,32 @@ public class FsDatasetCache {
|
||||||
/**
|
/**
|
||||||
* Get the approximate amount of DRAM cache space used.
|
* Get the approximate amount of DRAM cache space used.
|
||||||
*/
|
*/
|
||||||
public long getCacheUsed() {
|
public long getMemCacheUsed() {
|
||||||
return memCacheStats.getCacheUsed();
|
return memCacheStats.getCacheUsed();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the approximate amount of persistent memory cache space used.
|
* Get the approximate amount of cache space used either on DRAM or
|
||||||
* TODO: advertise this metric to NameNode by FSDatasetMBean
|
* on persistent memory.
|
||||||
|
* @return
|
||||||
*/
|
*/
|
||||||
public long getPmemCacheUsed() {
|
public long getCacheUsed() {
|
||||||
if (isPmemCacheEnabled()) {
|
return cacheLoader.getCacheUsed();
|
||||||
return cacheLoader.getCacheUsed();
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the maximum amount of bytes we can cache on DRAM. This is a constant.
|
* Get the maximum amount of bytes we can cache on DRAM. This is a constant.
|
||||||
*/
|
*/
|
||||||
public long getCacheCapacity() {
|
public long getMemCacheCapacity() {
|
||||||
return memCacheStats.getCacheCapacity();
|
return memCacheStats.getCacheCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get cache capacity of persistent memory.
|
* Get the maximum amount of bytes we can cache either on DRAM or
|
||||||
* TODO: advertise this metric to NameNode by FSDatasetMBean
|
* on persistent memory. This is a constant.
|
||||||
*/
|
*/
|
||||||
public long getPmemCacheCapacity() {
|
public long getCacheCapacity() {
|
||||||
if (isPmemCacheEnabled()) {
|
return cacheLoader.getCacheCapacity();
|
||||||
return cacheLoader.getCacheCapacity();
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getNumBlocksFailedToCache() {
|
public long getNumBlocksFailedToCache() {
|
||||||
|
@ -608,11 +589,6 @@ public class FsDatasetCache {
|
||||||
return (val != null) && val.state.shouldAdvertise();
|
return (val != null) && val.state.shouldAdvertise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
MappableBlockLoader getCacheLoader() {
|
|
||||||
return cacheLoader;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method can be executed during DataNode shutdown.
|
* This method can be executed during DataNode shutdown.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -3186,10 +3186,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
public void evictBlocks(long bytesNeeded) throws IOException {
|
public void evictBlocks(long bytesNeeded) throws IOException {
|
||||||
int iterations = 0;
|
int iterations = 0;
|
||||||
|
|
||||||
final long cacheCapacity = cacheManager.getCacheCapacity();
|
final long cacheCapacity = cacheManager.getMemCacheCapacity();
|
||||||
|
|
||||||
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
|
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
|
||||||
(cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
|
(cacheCapacity - cacheManager.getMemCacheUsed()) < bytesNeeded) {
|
||||||
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
|
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
|
||||||
|
|
||||||
if (replicaState == null) {
|
if (replicaState == null) {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
|
@ -43,7 +44,7 @@ public abstract class MappableBlockLoader {
|
||||||
/**
|
/**
|
||||||
* Initialize a specific MappableBlockLoader.
|
* Initialize a specific MappableBlockLoader.
|
||||||
*/
|
*/
|
||||||
abstract void initialize(FsDatasetCache cacheManager) throws IOException;
|
abstract CacheStats initialize(DNConf dnConf) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the block.
|
* Load the block.
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -39,12 +40,13 @@ import java.nio.channels.FileChannel;
|
||||||
public class MemoryMappableBlockLoader extends MappableBlockLoader {
|
public class MemoryMappableBlockLoader extends MappableBlockLoader {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
|
LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
|
||||||
private MemoryCacheStats memCacheStats;
|
private CacheStats memCacheStats;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void initialize(FsDatasetCache cacheManager) throws IOException {
|
CacheStats initialize(DNConf dnConf) throws IOException {
|
||||||
LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
|
LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
|
||||||
this.memCacheStats = cacheManager.getMemCacheStats();
|
this.memCacheStats = new CacheStats(dnConf.getMaxLockedMemory());
|
||||||
|
return memCacheStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
@ -47,8 +48,8 @@ public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
|
||||||
LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
|
LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void initialize(FsDatasetCache cacheManager) throws IOException {
|
CacheStats initialize(DNConf dnConf) throws IOException {
|
||||||
super.initialize(cacheManager);
|
return super.initialize(dnConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -42,11 +42,16 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
|
||||||
private PmemVolumeManager pmemVolumeManager;
|
private PmemVolumeManager pmemVolumeManager;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void initialize(FsDatasetCache cacheManager) throws IOException {
|
CacheStats initialize(DNConf dnConf) throws IOException {
|
||||||
LOG.info("Initializing cache loader: " + this.getClass().getName());
|
LOG.info("Initializing cache loader: " + this.getClass().getName());
|
||||||
DNConf dnConf = cacheManager.getDnConf();
|
|
||||||
PmemVolumeManager.init(dnConf.getPmemVolumes());
|
PmemVolumeManager.init(dnConf.getPmemVolumes());
|
||||||
pmemVolumeManager = PmemVolumeManager.getInstance();
|
pmemVolumeManager = PmemVolumeManager.getInstance();
|
||||||
|
// The configuration for max locked memory is shaded.
|
||||||
|
LOG.info("Persistent memory is used for caching data instead of " +
|
||||||
|
"DRAM. Max locked memory is set to zero to disable DRAM cache");
|
||||||
|
// TODO: PMem is not supporting Lazy Writer now, will refine this stats
|
||||||
|
// while implementing it.
|
||||||
|
return new CacheStats(0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -84,7 +84,6 @@ public class TestCacheByPmemMappableBlockLoader {
|
||||||
private static DistributedFileSystem fs;
|
private static DistributedFileSystem fs;
|
||||||
private static DataNode dn;
|
private static DataNode dn;
|
||||||
private static FsDatasetCache cacheManager;
|
private static FsDatasetCache cacheManager;
|
||||||
private static PmemMappableBlockLoader cacheLoader;
|
|
||||||
/**
|
/**
|
||||||
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
|
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
|
||||||
* shared read lock. The test acquires the write lock for exclusive access.
|
* shared read lock. The test acquires the write lock for exclusive access.
|
||||||
|
@ -131,8 +130,6 @@ public class TestCacheByPmemMappableBlockLoader {
|
||||||
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
|
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
|
||||||
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
|
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
|
||||||
CACHE_CAPACITY);
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
|
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
|
||||||
|
|
||||||
|
@ -153,7 +150,6 @@ public class TestCacheByPmemMappableBlockLoader {
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
dn = cluster.getDataNodes().get(0);
|
dn = cluster.getDataNodes().get(0);
|
||||||
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
|
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
|
||||||
cacheLoader = (PmemMappableBlockLoader) cacheManager.getCacheLoader();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -216,7 +212,9 @@ public class TestCacheByPmemMappableBlockLoader {
|
||||||
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
|
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
|
||||||
BlockReaderTestUtil.enableHdfsCachingTracing();
|
BlockReaderTestUtil.enableHdfsCachingTracing();
|
||||||
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
|
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
|
||||||
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheCapacity());
|
assertEquals(CACHE_CAPACITY, cacheManager.getCacheCapacity());
|
||||||
|
// DRAM cache is expected to be disabled.
|
||||||
|
assertEquals(0L, cacheManager.getMemCacheCapacity());
|
||||||
|
|
||||||
final Path testFile = new Path("/testFile");
|
final Path testFile = new Path("/testFile");
|
||||||
final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
|
final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
|
||||||
|
@ -246,7 +244,9 @@ public class TestCacheByPmemMappableBlockLoader {
|
||||||
}, 1000, 30000);
|
}, 1000, 30000);
|
||||||
|
|
||||||
// The pmem cache space is expected to have been used up.
|
// The pmem cache space is expected to have been used up.
|
||||||
assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
|
assertEquals(CACHE_CAPACITY, cacheManager.getCacheUsed());
|
||||||
|
// There should be no cache used on DRAM.
|
||||||
|
assertEquals(0L, cacheManager.getMemCacheUsed());
|
||||||
Map<ExtendedBlockId, Byte> blockKeyToVolume =
|
Map<ExtendedBlockId, Byte> blockKeyToVolume =
|
||||||
PmemVolumeManager.getInstance().getBlockKeyToVolume();
|
PmemVolumeManager.getInstance().getBlockKeyToVolume();
|
||||||
// All block keys should be kept in blockKeyToVolume
|
// All block keys should be kept in blockKeyToVolume
|
||||||
|
@ -318,7 +318,7 @@ public class TestCacheByPmemMappableBlockLoader {
|
||||||
}, 1000, 30000);
|
}, 1000, 30000);
|
||||||
|
|
||||||
// It is expected that no pmem cache space is used.
|
// It is expected that no pmem cache space is used.
|
||||||
assertEquals(0, cacheManager.getPmemCacheUsed());
|
assertEquals(0, cacheManager.getCacheUsed());
|
||||||
// No record should be kept by blockKeyToVolume after testFile is uncached.
|
// No record should be kept by blockKeyToVolume after testFile is uncached.
|
||||||
assertEquals(blockKeyToVolume.size(), 0);
|
assertEquals(blockKeyToVolume.size(), 0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.CacheStats.PageRounder;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
|
||||||
|
|
Loading…
Reference in New Issue