diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2df555aea17..cf3c75f1c0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -445,6 +445,9 @@ Release 2.4.0 - UNRELEASED HDFS-5981. PBImageXmlWriter generates malformed XML. (Haohui Mai via cnauroth) + HDFS-6107. When a block can't be cached due to limited space on the + DataNode, that block becomes uncacheable (cmccabe) + BREAKDOWN OF HDFS-4685 SUBTASKS AND RELATED JIRAS HDFS-5596. Implement RPC stubs. (Haohui Mai via cnauroth) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 84bc6571b63..2977bfcac6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -598,14 +598,12 @@ class BPOfferService { blockIdCmd.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"); dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); - dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_UNCACHE: LOG.info("DatanodeCommand action: DNA_UNCACHE for " + blockIdCmd.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"); dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); - dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length); break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 23acd47b752..524fda2d2f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1041,7 +1041,7 @@ public class DataNode extends Configured } } - DataNodeMetrics getMetrics() { + public DataNodeMetrics getMetrics() { return metrics; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 2061e88a759..277c2e7cb52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -337,15 +337,16 @@ public class FsDatasetCache { ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); long newUsedBytes = usedBytesCount.reserve(length); - if (newUsedBytes < 0) { - LOG.warn("Failed to cache " + key + ": could not reserve " + length + - " more bytes in the cache: " + - DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + - " of " + maxBytes + " exceeded."); - numBlocksFailedToCache.incrementAndGet(); - return; - } + boolean reservedBytes = false; try { + if (newUsedBytes < 0) { + LOG.warn("Failed to cache " + key + ": could not reserve " + length + + " more bytes in the cache: " + + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + + " of " + maxBytes + " exceeded."); + return; + } + reservedBytes = true; try { blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0); metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk) @@ -391,10 +392,13 @@ public class FsDatasetCache { } dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key); numBlocksCached.addAndGet(1); + dataset.datanode.getMetrics().incrBlocksCached(1); success = true; } finally { if (!success) { - newUsedBytes = usedBytesCount.release(length); + if (reservedBytes) { + newUsedBytes = usedBytesCount.release(length); + } if (LOG.isDebugEnabled()) { LOG.debug("Caching of " + key + " was aborted. We are now " + "caching only " + newUsedBytes + " + bytes in total."); @@ -439,6 +443,7 @@ public class FsDatasetCache { long newUsedBytes = usedBytesCount.release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); + dataset.datanode.getMetrics().incrBlocksUncached(1); if (LOG.isDebugEnabled()) { LOG.debug("Uncaching of " + key + " completed. " + "usedBytes = " + newUsedBytes); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index c73f24f2068..3ac14e86c08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1143,6 +1143,11 @@ public class DFSTestUtil { } return false; } + LOG.info("verifyExpectedCacheUsage: got " + + curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " + + curBlocks + "/" + expectedBlocks + " blocks cached. " + + "memlock limit = " + + NativeIO.POSIX.getCacheManipulator().getMemlockLimit()); return true; } }, 100, 60000); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 82b10749322..e3945f17087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -40,12 +40,15 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -80,6 +83,7 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import com.google.common.base.Supplier; +import com.google.common.primitives.Ints; public class TestFsDatasetCache { private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class); @@ -349,10 +353,13 @@ public class TestFsDatasetCache { fsd.getNumBlocksFailedToCache() > 0); // Uncache the n-1 files + int curCachedBlocks = 16; for (int i=0; i() { + @Override + public Boolean get() { + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + long blocksCached = + MetricsAsserts.getLongCounter("BlocksCached", dnMetrics); + if (blocksCached != TOTAL_BLOCKS_PER_CACHE) { + LOG.info("waiting for " + TOTAL_BLOCKS_PER_CACHE + " to " + + "be cached. Right now only " + blocksCached + " blocks are cached."); + return false; + } + LOG.info(TOTAL_BLOCKS_PER_CACHE + " blocks are now cached."); + return true; + } + }, 1000, 30000); + + // Try to cache a smaller file. It should fail. + final long shortCacheDirectiveId = + dfs.addCacheDirective(new CacheDirectiveInfo.Builder() + .setPool("pool").setPath(SMALL_FILE).setReplication((short)1).build()); + Thread.sleep(10000); + MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name()); + Assert.assertEquals(TOTAL_BLOCKS_PER_CACHE, + MetricsAsserts.getLongCounter("BlocksCached", dnMetrics)); + + // Uncache the big file and verify that the small file can now be + // cached (regression test for HDFS-6107) + dfs.removeCacheDirective(bigCacheDirectiveId); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + RemoteIterator iter; + try { + iter = dfs.listCacheDirectives( + new CacheDirectiveInfo.Builder().build()); + CacheDirectiveEntry entry; + do { + entry = iter.next(); + } while (entry.getInfo().getId() != shortCacheDirectiveId); + if (entry.getStats().getFilesCached() != 1) { + LOG.info("waiting for directive " + shortCacheDirectiveId + + " to be cached. stats = " + entry.getStats()); + return false; + } + LOG.info("directive " + shortCacheDirectiveId + " has been cached."); + } catch (IOException e) { + Assert.fail("unexpected exception" + e.toString()); + } + return true; + } + }, 1000, 30000); + } }