diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 440b7e82cc4..526ddff8b5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2816,6 +2816,9 @@ Release 2.8.0 - UNRELEASED HDFS-9842. dfs.datanode.balance.bandwidthPerSec should accept friendly size units. (Lin Yiqun via Arpit Agarwal) + HDFS-9549. TestCacheDirectives#testExceedsCapacity is flaky (Xiao Chen via + cmccabe) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index 2f81ddfad74..87cd7169ef8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -27,6 +27,7 @@ import java.util.Date; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -491,6 +492,26 @@ public class CacheReplicationMonitor extends Thread implements Closeable { * Blocks that are over-replicated should be removed from Datanodes. */ private void rescanCachedBlockMap() { + // Remove pendingCached blocks that will make DN out-of-capacity. + Set datanodes = + blockManager.getDatanodeManager().getDatanodes(); + for (DatanodeDescriptor dn : datanodes) { + long remaining = dn.getCacheRemaining(); + for (Iterator it = dn.getPendingCached().iterator(); + it.hasNext();) { + CachedBlock cblock = it.next(); + BlockInfo blockInfo = blockManager. + getStoredBlock(new Block(cblock.getBlockId())); + if (blockInfo.getNumBytes() > remaining) { + LOG.debug("Block {}: removing from PENDING_CACHED for node {} " + + "because it cannot fit in remaining cache size {}.", + cblock.getBlockId(), dn.getDatanodeUuid(), remaining); + it.remove(); + } else { + remaining -= blockInfo.getNumBytes(); + } + } + } for (Iterator cbIter = cachedBlocks.iterator(); cbIter.hasNext(); ) { scannedBlocks++; @@ -531,7 +552,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { DatanodeDescriptor datanode = iter.next(); datanode.getPendingCached().remove(cblock); iter.remove(); - LOG.trace("Block {}: removing from PENDING_CACHED for node {}" + LOG.trace("Block {}: removing from PENDING_CACHED for node {} " + "because we already have {} cached replicas and we only" + " need {}", cblock.getBlockId(), datanode.getDatanodeUuid(), numCached, @@ -686,8 +707,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable { long pendingCapacity = pendingBytes + datanode.getCacheRemaining(); if (pendingCapacity < blockInfo.getNumBytes()) { LOG.trace("Block {}: DataNode {} is not a valid possibility " + - "because the block has size {}, but the DataNode only has {}" + - "bytes of cache remaining ({} pending bytes, {} already cached.", + "because the block has size {}, but the DataNode only has {} " + + "bytes of cache remaining ({} pending bytes, {} already cached.)", blockInfo.getBlockId(), datanode.getDatanodeUuid(), blockInfo.getNumBytes(), pendingCapacity, pendingBytes, datanode.getCacheRemaining()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index d344ca66225..999c1fa26f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -413,6 +413,15 @@ public class DatanodeManager { return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); } + /** @return the datanode descriptors for all nodes. */ + public Set getDatanodes() { + final Set datanodes; + synchronized (this) { + datanodes = new HashSet<>(datanodeMap.values()); + } + return datanodes; + } + /** @return the Host2NodesMap */ public Host2NodesMap getHost2DatanodeMap() { return this.host2DatanodeMap; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 45d819e50a9..3793cae766e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -72,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolStats; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -86,9 +85,6 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.GSet; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -1470,6 +1466,7 @@ public class TestCacheDirectives { */ private void checkPendingCachedEmpty(MiniDFSCluster cluster) throws Exception { + Thread.sleep(1000); cluster.getNamesystem().readLock(); try { final DatanodeManager datanodeManager = @@ -1501,7 +1498,6 @@ public class TestCacheDirectives { waitForCachedBlocks(namenode, -1, numCachedReplicas, "testExceeds:1"); checkPendingCachedEmpty(cluster); - Thread.sleep(1000); checkPendingCachedEmpty(cluster); // Try creating a file with giant-sized blocks that exceed cache capacity @@ -1509,7 +1505,6 @@ public class TestCacheDirectives { DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2, (short) 1, 0xFADED); checkPendingCachedEmpty(cluster); - Thread.sleep(1000); checkPendingCachedEmpty(cluster); }