diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1918cac2e2a..637c77404c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -364,6 +364,8 @@ Trunk (Unreleased) HDFS-5482. DistributedFileSystem#listPathBasedCacheDirectives must support relative paths. (Colin Patrick McCabe via cnauroth) + HDFS-5320. Add datanode caching metrics. (wang) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index f12a92ff0ec..ef44e0cd158 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -155,7 +155,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements @Override public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, - StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, + StorageReport[] reports, long cacheCapacity, long cacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() @@ -165,11 +165,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements for (StorageReport r : reports) { builder.addReports(PBHelper.convert(r)); } - if (dnCacheCapacity != 0) { - builder.setDnCacheCapacity(dnCacheCapacity); + if (cacheCapacity != 0) { + builder.setCacheCapacity(cacheCapacity); } - if (dnCacheUsed != 0) { - builder.setDnCacheUsed(dnCacheUsed); + if (cacheUsed != 0) { + builder.setCacheUsed(cacheUsed); } HeartbeatResponseProto resp; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index ab067709519..3f0c437c90c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; -import com.google.common.primitives.Longs; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -112,7 +111,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements p.getBlockPoolUsed()); } response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), - report, request.getDnCacheCapacity(), request.getDnCacheUsed(), + report, request.getCacheCapacity(), request.getCacheUsed(), request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes()); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 73d3cdffadb..9d2b36d9823 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -435,7 +435,7 @@ class BPServiceActor implements Runnable { DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report - if (dn.getFSDataset().getDnCacheCapacity() == 0) { + if (dn.getFSDataset().getCacheCapacity() == 0) { return null; } // send cache report if timer has expired. @@ -475,8 +475,8 @@ class BPServiceActor implements Runnable { dn.getFSDataset().getRemaining(), dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) }; return bpNamenode.sendHeartbeat(bpRegistration, report, - dn.getFSDataset().getDnCacheCapacity(), - dn.getFSDataset().getDnCacheUsed(), + dn.getFSDataset().getCacheCapacity(), + dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), dn.getFSDataset().getNumFailedVolumes()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 2af46bb8915..3e12168e22d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -226,6 +226,15 @@ public class FsDatasetCache { */ private final long maxBytes; + /** + * Number of cache commands that could not be completed successfully + */ + AtomicLong numBlocksFailedToCache = new AtomicLong(0); + /** + * Number of uncache commands that could not be completed successfully + */ + AtomicLong numBlocksFailedToUncache = new AtomicLong(0); + public FsDatasetCache(FsDatasetImpl dataset) { this.dataset = dataset; this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); @@ -274,6 +283,7 @@ public class FsDatasetCache { " already exists in the FsDatasetCache with state " + prevValue.state); } + numBlocksFailedToCache.incrementAndGet(); return; } mappableBlockMap.put(key, new Value(null, State.CACHING)); @@ -291,6 +301,7 @@ public class FsDatasetCache { "does not need to be uncached, because it is not currently " + "in the mappableBlockMap."); } + numBlocksFailedToUncache.incrementAndGet(); return; } switch (prevValue.state) { @@ -317,6 +328,7 @@ public class FsDatasetCache { "does not need to be uncached, because it is " + "in state " + prevValue.state + "."); } + numBlocksFailedToUncache.incrementAndGet(); break; } } @@ -349,7 +361,8 @@ public class FsDatasetCache { LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + ": could not reserve " + length + " more bytes in the " + "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + - " of " + maxBytes + " exceeded."); + " of " + maxBytes + " exceeded."); + numBlocksFailedToCache.incrementAndGet(); return; } try { @@ -413,6 +426,7 @@ public class FsDatasetCache { if (mappableBlock != null) { mappableBlock.close(); } + numBlocksFailedToCache.incrementAndGet(); } } } @@ -449,7 +463,7 @@ public class FsDatasetCache { } } - // Stats related methods for FsDatasetMBean + // Stats related methods for FSDatasetMBean /** * Get the approximate amount of cache space used. @@ -464,4 +478,13 @@ public class FsDatasetCache { public long getDnCacheCapacity() { return maxBytes; } + + public long getNumBlocksFailedToCache() { + return numBlocksFailedToCache.get(); + } + + public long getNumBlocksFailedToUncache() { + return numBlocksFailedToUncache.get(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 65f57712ec3..d8c0155f719 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -290,22 +290,26 @@ class FsDatasetImpl implements FsDatasetSpi { return volumes.numberOfFailedVolumes(); } - /** - * Returns the total cache used by the datanode (in bytes). - */ @Override // FSDatasetMBean - public long getDnCacheUsed() { + public long getCacheUsed() { return cacheManager.getDnCacheUsed(); } - /** - * Returns the total cache capacity of the datanode (in bytes). - */ @Override // FSDatasetMBean - public long getDnCacheCapacity() { + public long getCacheCapacity() { return cacheManager.getDnCacheCapacity(); } + @Override // FSDatasetMBean + public long getNumBlocksFailedToCache() { + return cacheManager.getNumBlocksFailedToCache(); + } + + @Override // FSDatasetMBean + public long getNumBlocksFailedToUncache() { + return cacheManager.getNumBlocksFailedToUncache(); + } + /** * Find the block's on-disk length */ @@ -1193,28 +1197,36 @@ class FsDatasetImpl implements FsDatasetSpi { synchronized (this) { ReplicaInfo info = volumeMap.get(bpid, blockId); - if (info == null) { - LOG.warn("Failed to cache block with id " + blockId + ", pool " + - bpid + ": ReplicaInfo not found."); - return; - } - if (info.getState() != ReplicaState.FINALIZED) { - LOG.warn("Failed to cache block with id " + blockId + ", pool " + - bpid + ": replica is not finalized; it is in state " + - info.getState()); - return; - } + boolean success = false; try { - volume = (FsVolumeImpl)info.getVolume(); - if (volume == null) { + if (info == null) { LOG.warn("Failed to cache block with id " + blockId + ", pool " + - bpid + ": volume not found."); + bpid + ": ReplicaInfo not found."); return; } - } catch (ClassCastException e) { - LOG.warn("Failed to cache block with id " + blockId + - ": volume was not an instance of FsVolumeImpl."); - return; + if (info.getState() != ReplicaState.FINALIZED) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": replica is not finalized; it is in state " + + info.getState()); + return; + } + try { + volume = (FsVolumeImpl)info.getVolume(); + if (volume == null) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": volume not found."); + return; + } + } catch (ClassCastException e) { + LOG.warn("Failed to cache block with id " + blockId + + ": volume was not an instance of FsVolumeImpl."); + return; + } + success = true; + } finally { + if (!success) { + cacheManager.numBlocksFailedToCache.incrementAndGet(); + } } blockFileName = info.getBlockFile().getAbsolutePath(); length = info.getVisibleLength(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java index 82757d065bb..40ccefb6c3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java @@ -79,12 +79,22 @@ public interface FSDatasetMBean { public int getNumFailedVolumes(); /** - * Returns the total cache used by the datanode (in bytes). + * Returns the amount of cache used by the datanode (in bytes). */ - public long getDnCacheUsed(); + public long getCacheUsed(); /** * Returns the total cache capacity of the datanode (in bytes). */ - public long getDnCacheCapacity(); + public long getCacheCapacity(); + + /** + * Returns the number of blocks that the datanode was unable to cache + */ + public long getNumBlocksFailedToCache(); + + /** + * Returns the number of blocks that the datanode was unable to uncache + */ + public long getNumBlocksFailedToUncache(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index d64d97abca3..28077a85ff2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -188,8 +188,8 @@ message HeartbeatRequestProto { optional uint32 xmitsInProgress = 3 [ default = 0 ]; optional uint32 xceiverCount = 4 [ default = 0 ]; optional uint32 failedVolumes = 5 [ default = 0 ]; - optional uint64 dnCacheCapacity = 6 [ default = 0 ]; - optional uint64 dnCacheUsed = 7 [default = 0 ]; + optional uint64 cacheCapacity = 6 [ default = 0 ]; + optional uint64 cacheUsed = 7 [default = 0 ]; } message StorageReportProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index d5df755167c..a855f126420 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -497,12 +496,22 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override // FSDatasetMBean - public long getDnCacheUsed() { + public long getCacheUsed() { return 0l; } @Override // FSDatasetMBean - public long getDnCacheCapacity() { + public long getCacheCapacity() { + return 0l; + } + + @Override + public long getNumBlocksFailedToCache() { + return 0l; + } + + @Override + public long getNumBlocksFailedToUncache() { return 0l; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index e889413ec47..7f5a9101b65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static junit.framework.Assert.assertTrue; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; import java.io.FileInputStream; @@ -57,14 +59,15 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; public class TestFsDatasetCache { @@ -94,6 +97,7 @@ public class TestFsDatasetCache { conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); @@ -187,7 +191,7 @@ public class TestFsDatasetCache { @Override public Boolean get() { - long curDnCacheUsed = fsd.getDnCacheUsed(); + long curDnCacheUsed = fsd.getCacheUsed(); if (curDnCacheUsed != expected) { if (tries++ > 10) { LOG.info("verifyExpectedCacheUsage: expected " + @@ -222,22 +226,37 @@ public class TestFsDatasetCache { final long[] blockSizes = getBlockSizes(locs); // Check initial state - final long cacheCapacity = fsd.getDnCacheCapacity(); - long cacheUsed = fsd.getDnCacheUsed(); + final long cacheCapacity = fsd.getCacheCapacity(); + long cacheUsed = fsd.getCacheUsed(); long current = 0; assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected amount of cache used", current, cacheUsed); + MetricsRecordBuilder dnMetrics; + long numCacheCommands = 0; + long numUncacheCommands = 0; + // Cache each block in succession, checking each time for (int i=0; i numCacheCommands); + numCacheCommands = cmds; } // Uncache each block in succession, again checking each time for (int i=0; i numUncacheCommands); + numUncacheCommands = cmds; } LOG.info("finishing testCacheAndUncacheBlock"); } @@ -293,6 +312,9 @@ public class TestFsDatasetCache { return lines > 0; } }, 500, 30000); + // Also check the metrics for the failure + assertTrue("Expected more than 0 failed cache attempts", + fsd.getNumBlocksFailedToCache() > 0); // Uncache the n-1 files for (int i=0; i() { + @Override + public Boolean get() { + return fsd.getNumBlocksFailedToUncache() > 0; + } + }, 100, 10000); + } }