diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index c041cf4ed49..78bdc796b6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -53,3 +53,5 @@ HDFS-4949 (Unreleased) HDFS-5201. NativeIO: consolidate getrlimit into NativeIO#getMemlockLimit (Contributed by Colin Patrick McCabe) + HDFS-5210. Fix some failing unit tests on HDFS-4949 branch. + (Contributed by Andrew Wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java index fb269c7689d..d58f3081465 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java @@ -167,11 +167,13 @@ public class CacheReplicationManager extends ReportProcessor { } public void clearQueues() { - blocksToUncache.clear(); - synchronized (neededCacheBlocks) { - neededCacheBlocks.clear(); + if (isCachingEnabled) { + blocksToUncache.clear(); + synchronized (neededCacheBlocks) { + neededCacheBlocks.clear(); + } + pendingCacheBlocks.clear(); } - pendingCacheBlocks.clear(); } public boolean isCachingEnabled() { @@ -571,7 +573,8 @@ public class CacheReplicationManager extends ReportProcessor { } /** - * Return the safely cached replicas of a block in a BlocksMap + * Return the safe replicas (not corrupt or decomissioning/decommissioned) of + * a block in a BlocksMap */ List getSafeReplicas(BlocksMap map, Block block) { List nodes = new ArrayList(3); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index ce70b3f677f..b6255460a25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -156,7 +156,7 @@ class CacheReplicationMonitor implements Runnable { } // Choose some replicas to cache if needed additionalRepl = requiredRepl - effectiveRepl; - targets = new ArrayList(storedNodes); + targets = new ArrayList(storedNodes.size()); // Only target replicas that aren't already cached. for (DatanodeDescriptor dn: storedNodes) { if (!cachedNodes.contains(dn)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java index 2674f6a0c77..3bd19331ea3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java @@ -35,6 +35,9 @@ import org.apache.hadoop.hdfs.protocol.Block; @InterfaceAudience.LimitedPrivate({"HDFS"}) public class CacheReplicationPolicy { + // Not thread-safe, but only accessed by the CacheReplicationMonitor + private static RandomData random = new RandomDataImpl(); + /** * @return List of datanodes with sufficient capacity to cache the block */ @@ -53,8 +56,7 @@ public class CacheReplicationPolicy { /** * Returns a random datanode from targets, weighted by the amount of free - * cache capacity on the datanode. Prunes unsuitable datanodes from the - * targets list. + * cache capacity on the datanode. * * @param block Block to be cached * @param targets List of potential cache targets @@ -75,8 +77,7 @@ public class CacheReplicationPolicy { lottery.put(totalCacheAvailable, dn); } // Pick our lottery winner - RandomData r = new RandomDataImpl(); - long winningTicket = r.nextLong(0, totalCacheAvailable - 1); + long winningTicket = random.nextLong(0, totalCacheAvailable - 1); Entry winner = lottery.higherEntry(winningTicket); return winner.getValue(); } @@ -94,7 +95,10 @@ public class CacheReplicationPolicy { List chosen = new ArrayList(numTargets); for (int i = 0; i < numTargets && !sufficient.isEmpty(); i++) { - chosen.add(randomDatanodeByRemainingCache(block, sufficient)); + DatanodeDescriptor choice = + randomDatanodeByRemainingCache(block, sufficient); + chosen.add(choice); + sufficient.remove(choice); } return chosen; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 3c01345a3fe..bc78eda828a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -368,12 +368,6 @@ class BPOfferService { } } - void scheduleCacheReport(long delay) { - for (BPServiceActor actor: bpServices) { - actor.scheduleCacheReport(delay); - } - } - /** * Ask each of the actors to report a bad block hosted on another DN. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index b4912881a05..b96292410e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -242,17 +242,6 @@ class BPServiceActor implements Runnable { resetBlockReportTime = true; // reset future BRs for randomness } - void scheduleCacheReport(long delay) { - if (delay > 0) { - // Uniform random jitter by the delay - lastCacheReport = Time.monotonicNow() - - dnConf.cacheReportInterval - + DFSUtil.getRandom().nextInt(((int)delay)); - } else { // send at next heartbeat - lastCacheReport = lastCacheReport - dnConf.cacheReportInterval; - } - } - void reportBadBlocks(ExtendedBlock block) { if (bpRegistration == null) { return; @@ -445,6 +434,10 @@ class BPServiceActor implements Runnable { } DatanodeCommand cacheReport() throws IOException { + // If caching is disabled, do not send a cache report + if (dn.getFSDataset().getCacheCapacity() == 0) { + return null; + } // send cache report if timer has expired. DatanodeCommand cmd = null; long startTime = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 778820b0a82..71158881345 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1916,7 +1916,6 @@ public class DataNode extends Configured public void scheduleAllBlockReport(long delay) { for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { bpos.scheduleBlockReport(delay); - bpos.scheduleCacheReport(delay); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index de0bcd35d7b..a2a9e6c5a2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -225,7 +225,7 @@ class MappableBlock implements Closeable { blockBuf.flip(); // Number of read chunks, including partial chunk at end int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum; - checksumBuf.limit(chunks*bytesPerChecksum); + checksumBuf.limit(chunks*checksumSize); fillBuffer(metaChannel, checksumBuf); checksumBuf.flip(); checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 5b82848015d..945b4250382 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -186,6 +186,8 @@ public final class CacheManager { // TODO: adjustable cache replication factor namesystem.setCacheReplicationInt(directive.getPath(), file.getBlockReplication()); + } else { + LOG.warn("Path " + directive.getPath() + " is not a file"); } } catch (IOException ioe) { LOG.info("addDirective " + directive +": failed to cache file: " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java index 557a9bf91b7..8c7037c1e6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java @@ -48,9 +48,11 @@ import org.junit.Test; public class TestCacheReplicationManager { + private static final long BLOCK_SIZE = 512; + private static final int REPL_FACTOR = 3; + private static final int NUM_DATANODES = 4; // Most Linux installs allow a default of 64KB locked memory - private static final long CACHE_CAPACITY = 64 * 1024; - private static final long BLOCK_SIZE = 4096; + private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES; private static Configuration conf; private static MiniDFSCluster cluster = null; @@ -75,7 +77,7 @@ public class TestCacheReplicationManager { conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); + .numDataNodes(NUM_DATANODES).build(); cluster.waitActive(); fs = cluster.getFileSystem(); @@ -106,6 +108,25 @@ public class TestCacheReplicationManager { Thread.sleep(500); actual = countNumCachedBlocks(); } + waitForExpectedNumCachedReplicas(expected*REPL_FACTOR); + } + + private void waitForExpectedNumCachedReplicas(final int expected) + throws Exception { + BlocksMap cachedBlocksMap = cacheReplManager.cachedBlocksMap; + int actual = 0; + while (expected != actual) { + Thread.sleep(500); + nn.getNamesystem().readLock(); + try { + actual = 0; + for (BlockInfo b : cachedBlocksMap.getBlocks()) { + actual += cachedBlocksMap.numNodes(b); + } + } finally { + nn.getNamesystem().readUnlock(); + } + } } @Test(timeout=60000) @@ -114,7 +135,7 @@ public class TestCacheReplicationManager { final String pool = "friendlyPool"; nnRpc.addCachePool(new CachePoolInfo("friendlyPool")); // Create some test files - final int numFiles = 3; + final int numFiles = 2; final int numBlocksPerFile = 2; final List paths = new ArrayList(numFiles); for (int i=0; i