HDFS-9549. TestCacheDirectives#testExceedsCapacity is flaky (Xiao Chen via cmccabe)
(cherry picked from commit211c78c090
) (cherry picked from commit440379a0d4
)
This commit is contained in:
parent
fe43f57c2a
commit
7812cfdd81
|
@ -1799,6 +1799,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-9842. dfs.datanode.balance.bandwidthPerSec should accept friendly
|
||||
size units. (Lin Yiqun via Arpit Agarwal)
|
||||
|
||||
HDFS-9549. TestCacheDirectives#testExceedsCapacity is flaky (Xiao Chen via
|
||||
cmccabe)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Date;
|
|||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -491,6 +492,26 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|||
* Blocks that are over-replicated should be removed from Datanodes.
|
||||
*/
|
||||
private void rescanCachedBlockMap() {
|
||||
// Remove pendingCached blocks that will make DN out-of-capacity.
|
||||
Set<DatanodeDescriptor> datanodes =
|
||||
blockManager.getDatanodeManager().getDatanodes();
|
||||
for (DatanodeDescriptor dn : datanodes) {
|
||||
long remaining = dn.getCacheRemaining();
|
||||
for (Iterator<CachedBlock> it = dn.getPendingCached().iterator();
|
||||
it.hasNext();) {
|
||||
CachedBlock cblock = it.next();
|
||||
BlockInfo blockInfo = blockManager.
|
||||
getStoredBlock(new Block(cblock.getBlockId()));
|
||||
if (blockInfo.getNumBytes() > remaining) {
|
||||
LOG.debug("Block {}: removing from PENDING_CACHED for node {} "
|
||||
+ "because it cannot fit in remaining cache size {}.",
|
||||
cblock.getBlockId(), dn.getDatanodeUuid(), remaining);
|
||||
it.remove();
|
||||
} else {
|
||||
remaining -= blockInfo.getNumBytes();
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
|
||||
cbIter.hasNext(); ) {
|
||||
scannedBlocks++;
|
||||
|
@ -531,7 +552,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|||
DatanodeDescriptor datanode = iter.next();
|
||||
datanode.getPendingCached().remove(cblock);
|
||||
iter.remove();
|
||||
LOG.trace("Block {}: removing from PENDING_CACHED for node {}"
|
||||
LOG.trace("Block {}: removing from PENDING_CACHED for node {} "
|
||||
+ "because we already have {} cached replicas and we only" +
|
||||
" need {}",
|
||||
cblock.getBlockId(), datanode.getDatanodeUuid(), numCached,
|
||||
|
@ -686,8 +707,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
|||
long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
|
||||
if (pendingCapacity < blockInfo.getNumBytes()) {
|
||||
LOG.trace("Block {}: DataNode {} is not a valid possibility " +
|
||||
"because the block has size {}, but the DataNode only has {}" +
|
||||
"bytes of cache remaining ({} pending bytes, {} already cached.",
|
||||
"because the block has size {}, but the DataNode only has {} " +
|
||||
"bytes of cache remaining ({} pending bytes, {} already cached.)",
|
||||
blockInfo.getBlockId(), datanode.getDatanodeUuid(),
|
||||
blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
|
||||
datanode.getCacheRemaining());
|
||||
|
|
|
@ -416,6 +416,15 @@ public class DatanodeManager {
|
|||
return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
|
||||
}
|
||||
|
||||
/** @return the datanode descriptors for all nodes. */
|
||||
public Set<DatanodeDescriptor> getDatanodes() {
|
||||
final Set<DatanodeDescriptor> datanodes;
|
||||
synchronized (this) {
|
||||
datanodes = new HashSet<>(datanodeMap.values());
|
||||
}
|
||||
return datanodes;
|
||||
}
|
||||
|
||||
/** @return the Host2NodesMap */
|
||||
public Host2NodesMap getHost2DatanodeMap() {
|
||||
return this.host2DatanodeMap;
|
||||
|
|
|
@ -73,7 +73,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
|
@ -87,9 +86,6 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.GSet;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -1413,6 +1409,7 @@ public class TestCacheDirectives {
|
|||
*/
|
||||
private void checkPendingCachedEmpty(MiniDFSCluster cluster)
|
||||
throws Exception {
|
||||
Thread.sleep(1000);
|
||||
cluster.getNamesystem().readLock();
|
||||
try {
|
||||
final DatanodeManager datanodeManager =
|
||||
|
@ -1444,7 +1441,6 @@ public class TestCacheDirectives {
|
|||
waitForCachedBlocks(namenode, -1, numCachedReplicas,
|
||||
"testExceeds:1");
|
||||
checkPendingCachedEmpty(cluster);
|
||||
Thread.sleep(1000);
|
||||
checkPendingCachedEmpty(cluster);
|
||||
|
||||
// Try creating a file with giant-sized blocks that exceed cache capacity
|
||||
|
@ -1452,7 +1448,6 @@ public class TestCacheDirectives {
|
|||
DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2,
|
||||
(short) 1, 0xFADED);
|
||||
checkPendingCachedEmpty(cluster);
|
||||
Thread.sleep(1000);
|
||||
checkPendingCachedEmpty(cluster);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue