diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1086b8b4909..9128aa6525c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1799,6 +1799,9 @@ Release 2.8.0 - UNRELEASED HDFS-9842. dfs.datanode.balance.bandwidthPerSec should accept friendly size units. (Lin Yiqun via Arpit Agarwal) + HDFS-9549. TestCacheDirectives#testExceedsCapacity is flaky (Xiao Chen via + cmccabe) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index 2f81ddfad74..87cd7169ef8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -491,6 +492,26 @@ private String findReasonForNotCaching(CachedBlock cblock, * Blocks that are over-replicated should be removed from Datanodes. */ private void rescanCachedBlockMap() { + // Remove pendingCached blocks that will make DN out-of-capacity. + Set datanodes = + blockManager.getDatanodeManager().getDatanodes(); + for (DatanodeDescriptor dn : datanodes) { + long remaining = dn.getCacheRemaining(); + for (Iterator it = dn.getPendingCached().iterator(); + it.hasNext();) { + CachedBlock cblock = it.next(); + BlockInfo blockInfo = blockManager. + getStoredBlock(new Block(cblock.getBlockId())); + if (blockInfo.getNumBytes() > remaining) { + LOG.debug("Block {}: removing from PENDING_CACHED for node {} " + + "because it cannot fit in remaining cache size {}.", + cblock.getBlockId(), dn.getDatanodeUuid(), remaining); + it.remove(); + } else { + remaining -= blockInfo.getNumBytes(); + } + } + } for (Iterator cbIter = cachedBlocks.iterator(); cbIter.hasNext(); ) { scannedBlocks++; @@ -531,7 +552,7 @@ private void rescanCachedBlockMap() { DatanodeDescriptor datanode = iter.next(); datanode.getPendingCached().remove(cblock); iter.remove(); - LOG.trace("Block {}: removing from PENDING_CACHED for node {}" + LOG.trace("Block {}: removing from PENDING_CACHED for node {} " + "because we already have {} cached replicas and we only" + " need {}", cblock.getBlockId(), datanode.getDatanodeUuid(), numCached, @@ -686,8 +707,8 @@ private void addNewPendingCached(final int neededCached, long pendingCapacity = pendingBytes + datanode.getCacheRemaining(); if (pendingCapacity < blockInfo.getNumBytes()) { LOG.trace("Block {}: DataNode {} is not a valid possibility " + - "because the block has size {}, but the DataNode only has {}" + - "bytes of cache remaining ({} pending bytes, {} already cached.", + "because the block has size {}, but the DataNode only has {} " + + "bytes of cache remaining ({} pending bytes, {} already cached.)", blockInfo.getBlockId(), datanode.getDatanodeUuid(), blockInfo.getNumBytes(), pendingCapacity, pendingBytes, datanode.getCacheRemaining()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 9f878d1e00d..aa67a292e4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -416,6 +416,15 @@ public DatanodeDescriptor getDatanodeByXferAddr(String host, int xferPort) { return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); } + /** @return the datanode descriptors for all nodes. */ + public Set getDatanodes() { + final Set datanodes; + synchronized (this) { + datanodes = new HashSet<>(datanodeMap.values()); + } + return datanodes; + } + /** @return the Host2NodesMap */ public Host2NodesMap getHost2DatanodeMap() { return this.host2DatanodeMap; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index d6f526ba9c0..588f20953ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -73,7 +73,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -87,9 +86,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.GSet; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -1413,6 +1409,7 @@ public void testMaxRelativeExpiry() throws Exception { */ private void checkPendingCachedEmpty(MiniDFSCluster cluster) throws Exception { + Thread.sleep(1000); cluster.getNamesystem().readLock(); try { final DatanodeManager datanodeManager = @@ -1444,7 +1441,6 @@ public void testExceedsCapacity() throws Exception { waitForCachedBlocks(namenode, -1, numCachedReplicas, "testExceeds:1"); checkPendingCachedEmpty(cluster); - Thread.sleep(1000); checkPendingCachedEmpty(cluster); // Try creating a file with giant-sized blocks that exceed cache capacity @@ -1452,7 +1448,6 @@ public void testExceedsCapacity() throws Exception { DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2, (short) 1, 0xFADED); checkPendingCachedEmpty(cluster); - Thread.sleep(1000); checkPendingCachedEmpty(cluster); }