HDFS-5210. Fix some failing unit tests on HDFS-4949 branch. (Contributed by Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1523754 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2013-09-16 18:41:27 +00:00
parent 68ec07cade
commit 85c2036029
10 changed files with 52 additions and 34 deletions

View File

@ -53,3 +53,5 @@ HDFS-4949 (Unreleased)
HDFS-5201. NativeIO: consolidate getrlimit into NativeIO#getMemlockLimit
(Contributed by Colin Patrick McCabe)
HDFS-5210. Fix some failing unit tests on HDFS-4949 branch.
(Contributed by Andrew Wang)

View File

@ -167,12 +167,14 @@ public class CacheReplicationManager extends ReportProcessor {
}
public void clearQueues() {
if (isCachingEnabled) {
blocksToUncache.clear();
synchronized (neededCacheBlocks) {
neededCacheBlocks.clear();
}
pendingCacheBlocks.clear();
}
}
public boolean isCachingEnabled() {
return isCachingEnabled;
@ -571,7 +573,8 @@ public class CacheReplicationManager extends ReportProcessor {
}
/**
* Return the safely cached replicas of a block in a BlocksMap
* Return the safe replicas (not corrupt or decomissioning/decommissioned) of
* a block in a BlocksMap
*/
List<DatanodeDescriptor> getSafeReplicas(BlocksMap map, Block block) {
List<DatanodeDescriptor> nodes = new ArrayList<DatanodeDescriptor>(3);

View File

@ -156,7 +156,7 @@ class CacheReplicationMonitor implements Runnable {
}
// Choose some replicas to cache if needed
additionalRepl = requiredRepl - effectiveRepl;
targets = new ArrayList<DatanodeDescriptor>(storedNodes);
targets = new ArrayList<DatanodeDescriptor>(storedNodes.size());
// Only target replicas that aren't already cached.
for (DatanodeDescriptor dn: storedNodes) {
if (!cachedNodes.contains(dn)) {

View File

@ -35,6 +35,9 @@ import org.apache.hadoop.hdfs.protocol.Block;
@InterfaceAudience.LimitedPrivate({"HDFS"})
public class CacheReplicationPolicy {
// Not thread-safe, but only accessed by the CacheReplicationMonitor
private static RandomData random = new RandomDataImpl();
/**
* @return List of datanodes with sufficient capacity to cache the block
*/
@ -53,8 +56,7 @@ public class CacheReplicationPolicy {
/**
* Returns a random datanode from targets, weighted by the amount of free
* cache capacity on the datanode. Prunes unsuitable datanodes from the
* targets list.
* cache capacity on the datanode.
*
* @param block Block to be cached
* @param targets List of potential cache targets
@ -75,8 +77,7 @@ public class CacheReplicationPolicy {
lottery.put(totalCacheAvailable, dn);
}
// Pick our lottery winner
RandomData r = new RandomDataImpl();
long winningTicket = r.nextLong(0, totalCacheAvailable - 1);
long winningTicket = random.nextLong(0, totalCacheAvailable - 1);
Entry<Long, DatanodeDescriptor> winner = lottery.higherEntry(winningTicket);
return winner.getValue();
}
@ -94,7 +95,10 @@ public class CacheReplicationPolicy {
List<DatanodeDescriptor> chosen =
new ArrayList<DatanodeDescriptor>(numTargets);
for (int i = 0; i < numTargets && !sufficient.isEmpty(); i++) {
chosen.add(randomDatanodeByRemainingCache(block, sufficient));
DatanodeDescriptor choice =
randomDatanodeByRemainingCache(block, sufficient);
chosen.add(choice);
sufficient.remove(choice);
}
return chosen;
}

View File

@ -368,12 +368,6 @@ class BPOfferService {
}
}
void scheduleCacheReport(long delay) {
for (BPServiceActor actor: bpServices) {
actor.scheduleCacheReport(delay);
}
}
/**
* Ask each of the actors to report a bad block hosted on another DN.
*/

View File

@ -242,17 +242,6 @@ class BPServiceActor implements Runnable {
resetBlockReportTime = true; // reset future BRs for randomness
}
void scheduleCacheReport(long delay) {
if (delay > 0) {
// Uniform random jitter by the delay
lastCacheReport = Time.monotonicNow()
- dnConf.cacheReportInterval
+ DFSUtil.getRandom().nextInt(((int)delay));
} else { // send at next heartbeat
lastCacheReport = lastCacheReport - dnConf.cacheReportInterval;
}
}
void reportBadBlocks(ExtendedBlock block) {
if (bpRegistration == null) {
return;
@ -445,6 +434,10 @@ class BPServiceActor implements Runnable {
}
DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report
if (dn.getFSDataset().getCacheCapacity() == 0) {
return null;
}
// send cache report if timer has expired.
DatanodeCommand cmd = null;
long startTime = Time.monotonicNow();

View File

@ -1916,7 +1916,6 @@ public class DataNode extends Configured
public void scheduleAllBlockReport(long delay) {
for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
bpos.scheduleBlockReport(delay);
bpos.scheduleCacheReport(delay);
}
}

View File

@ -225,7 +225,7 @@ class MappableBlock implements Closeable {
blockBuf.flip();
// Number of read chunks, including partial chunk at end
int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
checksumBuf.limit(chunks*bytesPerChecksum);
checksumBuf.limit(chunks*checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(),

View File

@ -186,6 +186,8 @@ public final class CacheManager {
// TODO: adjustable cache replication factor
namesystem.setCacheReplicationInt(directive.getPath(),
file.getBlockReplication());
} else {
LOG.warn("Path " + directive.getPath() + " is not a file");
}
} catch (IOException ioe) {
LOG.info("addDirective " + directive +": failed to cache file: " +

View File

@ -48,9 +48,11 @@ import org.junit.Test;
public class TestCacheReplicationManager {
private static final long BLOCK_SIZE = 512;
private static final int REPL_FACTOR = 3;
private static final int NUM_DATANODES = 4;
// Most Linux installs allow a default of 64KB locked memory
private static final long CACHE_CAPACITY = 64 * 1024;
private static final long BLOCK_SIZE = 4096;
private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
private static Configuration conf;
private static MiniDFSCluster cluster = null;
@ -75,7 +77,7 @@ public class TestCacheReplicationManager {
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@ -106,6 +108,25 @@ public class TestCacheReplicationManager {
Thread.sleep(500);
actual = countNumCachedBlocks();
}
waitForExpectedNumCachedReplicas(expected*REPL_FACTOR);
}
private void waitForExpectedNumCachedReplicas(final int expected)
throws Exception {
BlocksMap cachedBlocksMap = cacheReplManager.cachedBlocksMap;
int actual = 0;
while (expected != actual) {
Thread.sleep(500);
nn.getNamesystem().readLock();
try {
actual = 0;
for (BlockInfo b : cachedBlocksMap.getBlocks()) {
actual += cachedBlocksMap.numNodes(b);
}
} finally {
nn.getNamesystem().readUnlock();
}
}
}
@Test(timeout=60000)
@ -114,7 +135,7 @@ public class TestCacheReplicationManager {
final String pool = "friendlyPool";
nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
// Create some test files
final int numFiles = 3;
final int numFiles = 2;
final int numBlocksPerFile = 2;
final List<String> paths = new ArrayList<String>(numFiles);
for (int i=0; i<numFiles; i++) {