HDFS-5320. Add datanode caching metrics. Contributed by Andrew Wang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ec9ec0084e
commit
9673baa7e8
|
@ -364,6 +364,8 @@ Trunk (Unreleased)
|
|||
HDFS-5482. DistributedFileSystem#listPathBasedCacheDirectives must support
|
||||
relative paths. (Colin Patrick McCabe via cnauroth)
|
||||
|
||||
HDFS-5320. Add datanode caching metrics. (wang)
|
||||
|
||||
Release 2.3.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -155,7 +155,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
|||
|
||||
@Override
|
||||
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
|
||||
StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed,
|
||||
StorageReport[] reports, long cacheCapacity, long cacheUsed,
|
||||
int xmitsInProgress, int xceiverCount, int failedVolumes)
|
||||
throws IOException {
|
||||
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
|
||||
|
@ -165,11 +165,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
|||
for (StorageReport r : reports) {
|
||||
builder.addReports(PBHelper.convert(r));
|
||||
}
|
||||
if (dnCacheCapacity != 0) {
|
||||
builder.setDnCacheCapacity(dnCacheCapacity);
|
||||
if (cacheCapacity != 0) {
|
||||
builder.setCacheCapacity(cacheCapacity);
|
||||
}
|
||||
if (dnCacheUsed != 0) {
|
||||
builder.setDnCacheUsed(dnCacheUsed);
|
||||
if (cacheUsed != 0) {
|
||||
builder.setCacheUsed(cacheUsed);
|
||||
}
|
||||
HeartbeatResponseProto resp;
|
||||
try {
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
@ -112,7 +111,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
|
|||
p.getBlockPoolUsed());
|
||||
}
|
||||
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
|
||||
report, request.getDnCacheCapacity(), request.getDnCacheUsed(),
|
||||
report, request.getCacheCapacity(), request.getCacheUsed(),
|
||||
request.getXmitsInProgress(),
|
||||
request.getXceiverCount(), request.getFailedVolumes());
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -435,7 +435,7 @@ class BPServiceActor implements Runnable {
|
|||
|
||||
DatanodeCommand cacheReport() throws IOException {
|
||||
// If caching is disabled, do not send a cache report
|
||||
if (dn.getFSDataset().getDnCacheCapacity() == 0) {
|
||||
if (dn.getFSDataset().getCacheCapacity() == 0) {
|
||||
return null;
|
||||
}
|
||||
// send cache report if timer has expired.
|
||||
|
@ -475,8 +475,8 @@ class BPServiceActor implements Runnable {
|
|||
dn.getFSDataset().getRemaining(),
|
||||
dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
|
||||
return bpNamenode.sendHeartbeat(bpRegistration, report,
|
||||
dn.getFSDataset().getDnCacheCapacity(),
|
||||
dn.getFSDataset().getDnCacheUsed(),
|
||||
dn.getFSDataset().getCacheCapacity(),
|
||||
dn.getFSDataset().getCacheUsed(),
|
||||
dn.getXmitsInProgress(),
|
||||
dn.getXceiverCount(),
|
||||
dn.getFSDataset().getNumFailedVolumes());
|
||||
|
|
|
@ -226,6 +226,15 @@ public class FsDatasetCache {
|
|||
*/
|
||||
private final long maxBytes;
|
||||
|
||||
/**
|
||||
* Number of cache commands that could not be completed successfully
|
||||
*/
|
||||
AtomicLong numBlocksFailedToCache = new AtomicLong(0);
|
||||
/**
|
||||
* Number of uncache commands that could not be completed successfully
|
||||
*/
|
||||
AtomicLong numBlocksFailedToUncache = new AtomicLong(0);
|
||||
|
||||
public FsDatasetCache(FsDatasetImpl dataset) {
|
||||
this.dataset = dataset;
|
||||
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
|
||||
|
@ -274,6 +283,7 @@ public class FsDatasetCache {
|
|||
" already exists in the FsDatasetCache with state " +
|
||||
prevValue.state);
|
||||
}
|
||||
numBlocksFailedToCache.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
mappableBlockMap.put(key, new Value(null, State.CACHING));
|
||||
|
@ -291,6 +301,7 @@ public class FsDatasetCache {
|
|||
"does not need to be uncached, because it is not currently " +
|
||||
"in the mappableBlockMap.");
|
||||
}
|
||||
numBlocksFailedToUncache.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
switch (prevValue.state) {
|
||||
|
@ -317,6 +328,7 @@ public class FsDatasetCache {
|
|||
"does not need to be uncached, because it is " +
|
||||
"in state " + prevValue.state + ".");
|
||||
}
|
||||
numBlocksFailedToUncache.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -350,6 +362,7 @@ public class FsDatasetCache {
|
|||
": could not reserve " + length + " more bytes in the " +
|
||||
"cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
|
||||
" of " + maxBytes + " exceeded.");
|
||||
numBlocksFailedToCache.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
@ -413,6 +426,7 @@ public class FsDatasetCache {
|
|||
if (mappableBlock != null) {
|
||||
mappableBlock.close();
|
||||
}
|
||||
numBlocksFailedToCache.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -449,7 +463,7 @@ public class FsDatasetCache {
|
|||
}
|
||||
}
|
||||
|
||||
// Stats related methods for FsDatasetMBean
|
||||
// Stats related methods for FSDatasetMBean
|
||||
|
||||
/**
|
||||
* Get the approximate amount of cache space used.
|
||||
|
@ -464,4 +478,13 @@ public class FsDatasetCache {
|
|||
public long getDnCacheCapacity() {
|
||||
return maxBytes;
|
||||
}
|
||||
|
||||
public long getNumBlocksFailedToCache() {
|
||||
return numBlocksFailedToCache.get();
|
||||
}
|
||||
|
||||
public long getNumBlocksFailedToUncache() {
|
||||
return numBlocksFailedToUncache.get();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -290,22 +290,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
return volumes.numberOfFailedVolumes();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the total cache used by the datanode (in bytes).
|
||||
*/
|
||||
@Override // FSDatasetMBean
|
||||
public long getDnCacheUsed() {
|
||||
public long getCacheUsed() {
|
||||
return cacheManager.getDnCacheUsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the total cache capacity of the datanode (in bytes).
|
||||
*/
|
||||
@Override // FSDatasetMBean
|
||||
public long getDnCacheCapacity() {
|
||||
public long getCacheCapacity() {
|
||||
return cacheManager.getDnCacheCapacity();
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getNumBlocksFailedToCache() {
|
||||
return cacheManager.getNumBlocksFailedToCache();
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getNumBlocksFailedToUncache() {
|
||||
return cacheManager.getNumBlocksFailedToUncache();
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the block's on-disk length
|
||||
*/
|
||||
|
@ -1193,28 +1197,36 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
synchronized (this) {
|
||||
ReplicaInfo info = volumeMap.get(bpid, blockId);
|
||||
if (info == null) {
|
||||
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
|
||||
bpid + ": ReplicaInfo not found.");
|
||||
return;
|
||||
}
|
||||
if (info.getState() != ReplicaState.FINALIZED) {
|
||||
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
|
||||
bpid + ": replica is not finalized; it is in state " +
|
||||
info.getState());
|
||||
return;
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
volume = (FsVolumeImpl)info.getVolume();
|
||||
if (volume == null) {
|
||||
if (info == null) {
|
||||
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
|
||||
bpid + ": volume not found.");
|
||||
bpid + ": ReplicaInfo not found.");
|
||||
return;
|
||||
}
|
||||
} catch (ClassCastException e) {
|
||||
LOG.warn("Failed to cache block with id " + blockId +
|
||||
": volume was not an instance of FsVolumeImpl.");
|
||||
return;
|
||||
if (info.getState() != ReplicaState.FINALIZED) {
|
||||
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
|
||||
bpid + ": replica is not finalized; it is in state " +
|
||||
info.getState());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
volume = (FsVolumeImpl)info.getVolume();
|
||||
if (volume == null) {
|
||||
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
|
||||
bpid + ": volume not found.");
|
||||
return;
|
||||
}
|
||||
} catch (ClassCastException e) {
|
||||
LOG.warn("Failed to cache block with id " + blockId +
|
||||
": volume was not an instance of FsVolumeImpl.");
|
||||
return;
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
cacheManager.numBlocksFailedToCache.incrementAndGet();
|
||||
}
|
||||
}
|
||||
blockFileName = info.getBlockFile().getAbsolutePath();
|
||||
length = info.getVisibleLength();
|
||||
|
|
|
@ -79,12 +79,22 @@ public interface FSDatasetMBean {
|
|||
public int getNumFailedVolumes();
|
||||
|
||||
/**
|
||||
* Returns the total cache used by the datanode (in bytes).
|
||||
* Returns the amount of cache used by the datanode (in bytes).
|
||||
*/
|
||||
public long getDnCacheUsed();
|
||||
public long getCacheUsed();
|
||||
|
||||
/**
|
||||
* Returns the total cache capacity of the datanode (in bytes).
|
||||
*/
|
||||
public long getDnCacheCapacity();
|
||||
public long getCacheCapacity();
|
||||
|
||||
/**
|
||||
* Returns the number of blocks that the datanode was unable to cache
|
||||
*/
|
||||
public long getNumBlocksFailedToCache();
|
||||
|
||||
/**
|
||||
* Returns the number of blocks that the datanode was unable to uncache
|
||||
*/
|
||||
public long getNumBlocksFailedToUncache();
|
||||
}
|
||||
|
|
|
@ -188,8 +188,8 @@ message HeartbeatRequestProto {
|
|||
optional uint32 xmitsInProgress = 3 [ default = 0 ];
|
||||
optional uint32 xceiverCount = 4 [ default = 0 ];
|
||||
optional uint32 failedVolumes = 5 [ default = 0 ];
|
||||
optional uint64 dnCacheCapacity = 6 [ default = 0 ];
|
||||
optional uint64 dnCacheUsed = 7 [default = 0 ];
|
||||
optional uint64 cacheCapacity = 6 [ default = 0 ];
|
||||
optional uint64 cacheUsed = 7 [default = 0 ];
|
||||
}
|
||||
|
||||
message StorageReportProto {
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -497,12 +496,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getDnCacheUsed() {
|
||||
public long getCacheUsed() {
|
||||
return 0l;
|
||||
}
|
||||
|
||||
@Override // FSDatasetMBean
|
||||
public long getDnCacheCapacity() {
|
||||
public long getCacheCapacity() {
|
||||
return 0l;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumBlocksFailedToCache() {
|
||||
return 0l;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumBlocksFailedToUncache() {
|
||||
return 0l;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,11 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
|
@ -57,14 +59,15 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.MetricsAsserts;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
public class TestFsDatasetCache {
|
||||
|
@ -94,6 +97,7 @@ public class TestFsDatasetCache {
|
|||
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
|
||||
CACHE_CAPACITY);
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build();
|
||||
|
@ -187,7 +191,7 @@ public class TestFsDatasetCache {
|
|||
|
||||
@Override
|
||||
public Boolean get() {
|
||||
long curDnCacheUsed = fsd.getDnCacheUsed();
|
||||
long curDnCacheUsed = fsd.getCacheUsed();
|
||||
if (curDnCacheUsed != expected) {
|
||||
if (tries++ > 10) {
|
||||
LOG.info("verifyExpectedCacheUsage: expected " +
|
||||
|
@ -222,22 +226,37 @@ public class TestFsDatasetCache {
|
|||
final long[] blockSizes = getBlockSizes(locs);
|
||||
|
||||
// Check initial state
|
||||
final long cacheCapacity = fsd.getDnCacheCapacity();
|
||||
long cacheUsed = fsd.getDnCacheUsed();
|
||||
final long cacheCapacity = fsd.getCacheCapacity();
|
||||
long cacheUsed = fsd.getCacheUsed();
|
||||
long current = 0;
|
||||
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
|
||||
assertEquals("Unexpected amount of cache used", current, cacheUsed);
|
||||
|
||||
MetricsRecordBuilder dnMetrics;
|
||||
long numCacheCommands = 0;
|
||||
long numUncacheCommands = 0;
|
||||
|
||||
// Cache each block in succession, checking each time
|
||||
for (int i=0; i<NUM_BLOCKS; i++) {
|
||||
setHeartbeatResponse(cacheBlock(locs[i]));
|
||||
current = verifyExpectedCacheUsage(current + blockSizes[i]);
|
||||
dnMetrics = getMetrics(dn.getMetrics().name());
|
||||
long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
|
||||
assertTrue("Expected more cache requests from the NN ("
|
||||
+ cmds + " <= " + numCacheCommands + ")",
|
||||
cmds > numCacheCommands);
|
||||
numCacheCommands = cmds;
|
||||
}
|
||||
|
||||
// Uncache each block in succession, again checking each time
|
||||
for (int i=0; i<NUM_BLOCKS; i++) {
|
||||
setHeartbeatResponse(uncacheBlock(locs[i]));
|
||||
current = verifyExpectedCacheUsage(current - blockSizes[i]);
|
||||
dnMetrics = getMetrics(dn.getMetrics().name());
|
||||
long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
|
||||
assertTrue("Expected more uncache requests from the NN",
|
||||
cmds > numUncacheCommands);
|
||||
numUncacheCommands = cmds;
|
||||
}
|
||||
LOG.info("finishing testCacheAndUncacheBlock");
|
||||
}
|
||||
|
@ -293,6 +312,9 @@ public class TestFsDatasetCache {
|
|||
return lines > 0;
|
||||
}
|
||||
}, 500, 30000);
|
||||
// Also check the metrics for the failure
|
||||
assertTrue("Expected more than 0 failed cache attempts",
|
||||
fsd.getNumBlocksFailedToCache() > 0);
|
||||
|
||||
// Uncache the n-1 files
|
||||
for (int i=0; i<numFiles-1; i++) {
|
||||
|
@ -322,8 +344,8 @@ public class TestFsDatasetCache {
|
|||
final long[] blockSizes = getBlockSizes(locs);
|
||||
|
||||
// Check initial state
|
||||
final long cacheCapacity = fsd.getDnCacheCapacity();
|
||||
long cacheUsed = fsd.getDnCacheUsed();
|
||||
final long cacheCapacity = fsd.getCacheCapacity();
|
||||
long cacheUsed = fsd.getCacheUsed();
|
||||
long current = 0;
|
||||
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
|
||||
assertEquals("Unexpected amount of cache used", current, cacheUsed);
|
||||
|
@ -354,4 +376,24 @@ public class TestFsDatasetCache {
|
|||
current = verifyExpectedCacheUsage(0);
|
||||
LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testUncacheUnknownBlock() throws Exception {
|
||||
// Create a file
|
||||
Path fileName = new Path("/testUncacheUnknownBlock");
|
||||
int fileLen = 4096;
|
||||
DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
|
||||
HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
|
||||
fileName, 0, fileLen);
|
||||
|
||||
// Try to uncache it without caching it first
|
||||
setHeartbeatResponse(uncacheBlocks(locs));
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return fsd.getNumBlocksFailedToUncache() > 0;
|
||||
}
|
||||
}, 100, 10000);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue