diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 55c0a8b612e..b1bcdf81d36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -244,12 +244,14 @@ Trunk (Unreleased) HDFS-5636. Enforce a max TTL per cache pool. (awang via cmccabe) OPTIMIZATIONS + HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5665. Remove the unnecessary writeLock while initializing CacheManager in FsNameSystem Ctor. (Uma Maheswara Rao G via Andrew Wang) BUG FIXES + HADOOP-9635 Fix potential Stack Overflow in DomainSocket.c (V. Karthik Kumar via cmccabe) @@ -456,6 +458,10 @@ Trunk (Unreleased) HDFS-5701. Fix the CacheAdmin -addPool -maxTtl option name. (Stephen Chu via wang) + HDFS-5708. The CacheManager throws a NPE in the DataNode logs when + processing cache reports that refer to a block not known to the + BlockManager. (cmccabe via wang) + BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS HDFS-4985. Add storage type to the protocol and expose it in block report diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index 5aa440fb6f6..e86f345a499 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -460,14 +460,21 @@ public class CacheReplicationMonitor extends Thread implements Closeable { directive.getReplication()) * blockInfo.getNumBytes(); cachedTotal += cachedByBlock; - if (mark != ocblock.getMark()) { - // Mark hasn't been set in this scan, so update replication and mark. + if ((mark != ocblock.getMark()) || + (ocblock.getReplication() < directive.getReplication())) { + // + // Overwrite the block's replication and mark in two cases: + // + // 1. If the mark on the CachedBlock is different from the mark for + // this scan, that means the block hasn't been updated during this + // scan, and we should overwrite whatever is there, since it is no + // longer valid. + // + // 2. If the replication in the CachedBlock is less than what the + // directive asks for, we want to increase the block's replication + // field to what the directive asks for. + // ocblock.setReplicationAndMark(directive.getReplication(), mark); - } else { - // Mark already set in this scan. Set replication to highest value in - // any CacheDirective that covers this file. - ocblock.setReplicationAndMark((short)Math.max( - directive.getReplication(), ocblock.getReplication()), mark); } } } @@ -483,6 +490,36 @@ public class CacheReplicationMonitor extends Thread implements Closeable { } } + private String findReasonForNotCaching(CachedBlock cblock, + BlockInfo blockInfo) { + if (blockInfo == null) { + // Somehow, a cache report with the block arrived, but the block + // reports from the DataNode haven't (yet?) described such a block. + // Alternately, the NameNode might have invalidated the block, but the + // DataNode hasn't caught up. In any case, we want to tell the DN + // to uncache this. + return "not tracked by the BlockManager"; + } else if (!blockInfo.isComplete()) { + // When a cached block changes state from complete to some other state + // on the DataNode (perhaps because of append), it will begin the + // uncaching process. However, the uncaching process is not + // instantaneous, especially if clients have pinned the block. So + // there may be a period of time when incomplete blocks remain cached + // on the DataNodes. + return "not complete"; + } else if (cblock.getReplication() == 0) { + // Since 0 is not a valid value for a cache directive's replication + // field, seeing a replication of 0 on a CacheBlock means that it + // has never been reached by any sweep. + return "not needed by any directives"; + } else if (cblock.getMark() != mark) { + // Although the block was needed in the past, we didn't reach it during + // the current sweep. Therefore, it doesn't need to be cached any more. + return "no longer needed by any directives"; + } + return null; + } + /** * Scan through the cached block map. * Any blocks which are under-replicated should be assigned new Datanodes. @@ -508,11 +545,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable { iter.remove(); } } - // If the block's mark doesn't match with the mark of this scan, that - // means that this block couldn't be reached during this scan. That means - // it doesn't need to be cached any more. - int neededCached = (cblock.getMark() != mark) ? - 0 : cblock.getReplication(); + BlockInfo blockInfo = blockManager. + getStoredBlock(new Block(cblock.getBlockId())); + String reason = findReasonForNotCaching(cblock, blockInfo); + int neededCached = 0; + if (reason != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("not caching " + cblock + " because it is " + reason); + } + } else { + neededCached = cblock.getReplication(); + } int numCached = cached.size(); if (numCached >= neededCached) { // If we have enough replicas, drop all pending cached. @@ -612,8 +655,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable { BlockInfo blockInfo = blockManager. getStoredBlock(new Block(cachedBlock.getBlockId())); if (blockInfo == null) { - LOG.debug("Not caching block " + cachedBlock + " because it " + - "was deleted from all DataNodes."); + if (LOG.isDebugEnabled()) { + LOG.debug("Not caching block " + cachedBlock + " because there " + + "is no record of it on the NameNode."); + } return; } if (!blockInfo.isComplete()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index e25913d9cb9..f24b386df16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -940,39 +939,28 @@ public final class CacheManager { final List blockIds) { CachedBlocksList cached = datanode.getCached(); cached.clear(); + CachedBlocksList cachedList = datanode.getCached(); + CachedBlocksList pendingCachedList = datanode.getPendingCached(); for (Iterator iter = blockIds.iterator(); iter.hasNext(); ) { - Block block = new Block(iter.next()); - BlockInfo blockInfo = blockManager.getStoredBlock(block); - if (!blockInfo.isComplete()) { - LOG.warn("Ignoring block id " + block.getBlockId() + ", because " + - "it is in not complete yet. It is in state " + - blockInfo.getBlockUCState()); - continue; - } - Collection corruptReplicas = - blockManager.getCorruptReplicas(blockInfo); - if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) { - // The NameNode will eventually remove or update the corrupt block. - // Until then, we pretend that it isn't cached. - LOG.warn("Ignoring cached replica on " + datanode + " of " + block + - " because it is corrupt."); - continue; - } + long blockId = iter.next(); CachedBlock cachedBlock = - new CachedBlock(block.getBlockId(), (short)0, false); + new CachedBlock(blockId, (short)0, false); CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); - // Use the existing CachedBlock if it's present; otherwise, - // insert a new one. + // Add the block ID from the cache report to the cachedBlocks map + // if it's not already there. if (prevCachedBlock != null) { cachedBlock = prevCachedBlock; } else { cachedBlocks.put(cachedBlock); } - if (!cachedBlock.isPresent(datanode.getCached())) { - datanode.getCached().add(cachedBlock); + // Add the block to the datanode's implicit cached block list + // if it's not already there. Similarly, remove it from the pending + // cached block list if it exists there. + if (!cachedBlock.isPresent(cachedList)) { + cachedList.add(cachedBlock); } - if (cachedBlock.isPresent(datanode.getPendingCached())) { - datanode.getPendingCached().remove(cachedBlock); + if (cachedBlock.isPresent(pendingCachedList)) { + pendingCachedList.remove(cachedBlock); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 916e1fa9829..6ab808ea167 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration; import org.apache.hadoop.hdfs.protocol.CachePoolStats; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; @@ -796,7 +797,15 @@ public class TestCacheDirectives { } }, 500, 60000); + // Send a cache report referring to a bogus block. It is important that + // the NameNode be robust against this. NamenodeProtocols nnRpc = namenode.getRpcServer(); + DataNode dn0 = cluster.getDataNodes().get(0); + String bpid = cluster.getNamesystem().getBlockPoolId(); + LinkedList bogusBlockIds = new LinkedList (); + bogusBlockIds.add(999999L); + nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds); + Path rootDir = helper.getDefaultWorkingDirectory(dfs); // Create the pool final String pool = "friendlyPool";