HDFS-9549. TestCacheDirectives#testExceedsCapacity is flaky (Xiao Chen via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2016-02-23 12:01:20 -08:00
parent d9c409a428
commit 211c78c090
4 changed files with 37 additions and 9 deletions

View File

@ -2816,6 +2816,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9842. dfs.datanode.balance.bandwidthPerSec should accept friendly HDFS-9842. dfs.datanode.balance.bandwidthPerSec should accept friendly
size units. (Lin Yiqun via Arpit Agarwal) size units. (Lin Yiqun via Arpit Agarwal)
HDFS-9549. TestCacheDirectives#testExceedsCapacity is flaky (Xiao Chen via
cmccabe)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@ import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.Random; import java.util.Random;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -491,6 +492,26 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
* Blocks that are over-replicated should be removed from Datanodes. * Blocks that are over-replicated should be removed from Datanodes.
*/ */
private void rescanCachedBlockMap() { private void rescanCachedBlockMap() {
// Remove pendingCached blocks that will make DN out-of-capacity.
Set<DatanodeDescriptor> datanodes =
blockManager.getDatanodeManager().getDatanodes();
for (DatanodeDescriptor dn : datanodes) {
long remaining = dn.getCacheRemaining();
for (Iterator<CachedBlock> it = dn.getPendingCached().iterator();
it.hasNext();) {
CachedBlock cblock = it.next();
BlockInfo blockInfo = blockManager.
getStoredBlock(new Block(cblock.getBlockId()));
if (blockInfo.getNumBytes() > remaining) {
LOG.debug("Block {}: removing from PENDING_CACHED for node {} "
+ "because it cannot fit in remaining cache size {}.",
cblock.getBlockId(), dn.getDatanodeUuid(), remaining);
it.remove();
} else {
remaining -= blockInfo.getNumBytes();
}
}
}
for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator(); for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
cbIter.hasNext(); ) { cbIter.hasNext(); ) {
scannedBlocks++; scannedBlocks++;
@ -531,7 +552,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
DatanodeDescriptor datanode = iter.next(); DatanodeDescriptor datanode = iter.next();
datanode.getPendingCached().remove(cblock); datanode.getPendingCached().remove(cblock);
iter.remove(); iter.remove();
LOG.trace("Block {}: removing from PENDING_CACHED for node {}" LOG.trace("Block {}: removing from PENDING_CACHED for node {} "
+ "because we already have {} cached replicas and we only" + + "because we already have {} cached replicas and we only" +
" need {}", " need {}",
cblock.getBlockId(), datanode.getDatanodeUuid(), numCached, cblock.getBlockId(), datanode.getDatanodeUuid(), numCached,
@ -686,8 +707,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
long pendingCapacity = pendingBytes + datanode.getCacheRemaining(); long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
if (pendingCapacity < blockInfo.getNumBytes()) { if (pendingCapacity < blockInfo.getNumBytes()) {
LOG.trace("Block {}: DataNode {} is not a valid possibility " + LOG.trace("Block {}: DataNode {} is not a valid possibility " +
"because the block has size {}, but the DataNode only has {}" + "because the block has size {}, but the DataNode only has {} " +
"bytes of cache remaining ({} pending bytes, {} already cached.", "bytes of cache remaining ({} pending bytes, {} already cached.)",
blockInfo.getBlockId(), datanode.getDatanodeUuid(), blockInfo.getBlockId(), datanode.getDatanodeUuid(),
blockInfo.getNumBytes(), pendingCapacity, pendingBytes, blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
datanode.getCacheRemaining()); datanode.getCacheRemaining());

View File

@ -413,6 +413,15 @@ public class DatanodeManager {
return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
} }
/** @return the datanode descriptors for all nodes. */
public Set<DatanodeDescriptor> getDatanodes() {
final Set<DatanodeDescriptor> datanodes;
synchronized (this) {
datanodes = new HashSet<>(datanodeMap.values());
}
return datanodes;
}
/** @return the Host2NodesMap */ /** @return the Host2NodesMap */
public Host2NodesMap getHost2DatanodeMap() { public Host2NodesMap getHost2DatanodeMap() {
return this.host2DatanodeMap; return this.host2DatanodeMap;

View File

@ -72,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -86,9 +85,6 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.GSet;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -1470,6 +1466,7 @@ public class TestCacheDirectives {
*/ */
private void checkPendingCachedEmpty(MiniDFSCluster cluster) private void checkPendingCachedEmpty(MiniDFSCluster cluster)
throws Exception { throws Exception {
Thread.sleep(1000);
cluster.getNamesystem().readLock(); cluster.getNamesystem().readLock();
try { try {
final DatanodeManager datanodeManager = final DatanodeManager datanodeManager =
@ -1501,7 +1498,6 @@ public class TestCacheDirectives {
waitForCachedBlocks(namenode, -1, numCachedReplicas, waitForCachedBlocks(namenode, -1, numCachedReplicas,
"testExceeds:1"); "testExceeds:1");
checkPendingCachedEmpty(cluster); checkPendingCachedEmpty(cluster);
Thread.sleep(1000);
checkPendingCachedEmpty(cluster); checkPendingCachedEmpty(cluster);
// Try creating a file with giant-sized blocks that exceed cache capacity // Try creating a file with giant-sized blocks that exceed cache capacity
@ -1509,7 +1505,6 @@ public class TestCacheDirectives {
DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2, DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2,
(short) 1, 0xFADED); (short) 1, 0xFADED);
checkPendingCachedEmpty(cluster); checkPendingCachedEmpty(cluster);
Thread.sleep(1000);
checkPendingCachedEmpty(cluster); checkPendingCachedEmpty(cluster);
} }