diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d94a213308c..4268154d046 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -949,6 +949,9 @@ Release 2.8.0 - UNRELEASED HDFS-8542. WebHDFS getHomeDirectory behavior does not match specification. (Kanaka Kumar Avvaru via jghoman) + HDFS-8546. Prune cached replicas from DatanodeDescriptor state on replica + invalidation. (wang) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7d3a6783cfe..368d3b04d1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -3108,6 +3109,19 @@ public void removeStoredBlock(Block block, DatanodeDescriptor node) { return; } + CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() + .get(new CachedBlock(block.getBlockId(), (short) 0, false)); + if (cblock != null) { + boolean removed = false; + removed |= node.getPendingCached().remove(cblock); + removed |= node.getCached().remove(cblock); + removed |= node.getPendingUncached().remove(cblock); + if (removed) { + blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " + + "related lists on node {}", block, node); + } + } + // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f84dd997fdf..18174278228 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -709,8 +709,10 @@ private void offerService() throws Exception { } processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); - DatanodeCommand cmd = cacheReport(); - processCommand(new DatanodeCommand[]{ cmd }); + if (!dn.areCacheReportsDisabledForTests()) { + DatanodeCommand cmd = cacheReport(); + processCommand(new DatanodeCommand[]{ cmd }); + } // // There is no work to do; sleep until hearbeat timer elapses, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 6c8cf2b6a8c..e265dadcfbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -301,6 +301,7 @@ public static InetSocketAddress createSocketAddr(String target) { ThreadGroup threadGroup = null; private DNConf dnConf; private volatile boolean heartbeatsDisabledForTests = false; + private volatile boolean cacheReportsDisabledForTests = false; private DataStorage storage = null; private DatanodeHttpServer httpServer = null; @@ -1055,15 +1056,27 @@ private BPOfferService getBPOSForBlock(ExtendedBlock block) // used only for testing + @VisibleForTesting void setHeartbeatsDisabledForTests( boolean heartbeatsDisabledForTests) { this.heartbeatsDisabledForTests = heartbeatsDisabledForTests; } - + + @VisibleForTesting boolean areHeartbeatsDisabledForTests() { return this.heartbeatsDisabledForTests; } - + + @VisibleForTesting + void setCacheReportsDisabledForTest(boolean disabled) { + this.cacheReportsDisabledForTests = disabled; + } + + @VisibleForTesting + boolean areCacheReportsDisabledForTests() { + return this.cacheReportsDisabledForTests; + } + /** * This method starts the data node with the specified conf. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index e5270adc48a..e09ba32be6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; @@ -902,9 +903,26 @@ public void setCachedLocations(LocatedBlock block) { if (cachedBlock == null) { return; } - List datanodes = cachedBlock.getDatanodes(Type.CACHED); - for (DatanodeDescriptor datanode : datanodes) { - block.addCachedLoc(datanode); + List cachedDNs = cachedBlock.getDatanodes(Type.CACHED); + for (DatanodeDescriptor datanode : cachedDNs) { + // Filter out cached blocks that do not have a backing replica. + // + // This should not happen since it means the CacheManager thinks + // something is cached that does not exist, but it's a safety + // measure. + boolean found = false; + for (DatanodeInfo loc : block.getLocations()) { + if (loc.equals(datanode)) { + block.addCachedLoc(loc); + found = true; + break; + } + } + if (!found) { + LOG.warn("Datanode {} is not a valid cache location for block {} " + + "because that node does not have a backing replica!", + datanode, block.getBlock().getBlockName()); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d82da93f07d..b073a89abef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -6460,6 +6460,7 @@ public void setFSDirectory(FSDirectory dir) { this.dir = dir; } /** @return the cache manager. */ + @Override public CacheManager getCacheManager() { return cacheManager; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index 40c4765f91c..1732865a712 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -29,21 +29,23 @@ @InterfaceAudience.Private public interface Namesystem extends RwLock, SafeMode { /** Is this name system running? */ - public boolean isRunning(); + boolean isRunning(); /** Check if the user has superuser privilege. */ - public void checkSuperuserPrivilege() throws AccessControlException; + void checkSuperuserPrivilege() throws AccessControlException; /** @return the block pool ID */ - public String getBlockPoolId(); + String getBlockPoolId(); - public boolean isInStandbyState(); + boolean isInStandbyState(); - public boolean isGenStampInFuture(Block block); + boolean isGenStampInFuture(Block block); - public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); + void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); - public void checkOperation(OperationCategory read) throws StandbyException; + void checkOperation(OperationCategory read) throws StandbyException; - public boolean isInSnapshot(BlockInfoUnderConstruction blockUC); + boolean isInSnapshot(BlockInfoUnderConstruction blockUC); + + CacheManager getCacheManager(); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d06b0248654..96fb669435b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -79,6 +79,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FsShell; @@ -526,6 +527,23 @@ public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b, } } + public static void waitForReplication(final DistributedFileSystem dfs, + final Path file, final short replication, int waitForMillis) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + FileStatus stat = dfs.getFileStatus(file); + return replication == stat.getReplication(); + } catch (IOException e) { + LOG.info("getFileStatus on path " + file + " failed!", e); + return false; + } + } + }, 100, waitForMillis); + } + /** * Keep accessing the given file until the namenode reports that the * given block in the file contains the given number of corrupt replicas. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 9dee7249952..2f9a3e5d93f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -53,6 +54,16 @@ public static void setHeartbeatsDisabledForTests(DataNode dn, dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests); } + /** + * Set if cache reports are disabled for all DNs in a mini cluster. + */ + public static void setCacheReportsDisabledForTests(MiniDFSCluster cluster, + boolean disabled) { + for (DataNode dn : cluster.getDataNodes()) { + dn.setCacheReportsDisabledForTest(disabled); + } + } + public static void triggerDeletionReport(DataNode dn) throws IOException { for (BPOfferService bpos : dn.getAllBpOs()) { bpos.triggerDeletionReportForTests(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 602793427f7..cf004056e4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; @@ -1510,4 +1511,28 @@ public void testExceedsCapacity() throws Exception { Thread.sleep(1000); checkPendingCachedEmpty(cluster); } + + @Test(timeout=60000) + public void testNoBackingReplica() throws Exception { + // Cache all three replicas for a file. + final Path filename = new Path("/noback"); + final short replication = (short) 3; + DFSTestUtil.createFile(dfs, filename, 1, replication, 0x0BAC); + dfs.addCachePool(new CachePoolInfo("pool")); + dfs.addCacheDirective( + new CacheDirectiveInfo.Builder().setPool("pool").setPath(filename) + .setReplication(replication).build()); + waitForCachedBlocks(namenode, 1, replication, "testNoBackingReplica:1"); + // Pause cache reports while we change the replication factor. + // This will orphan some cached replicas. + DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, true); + try { + dfs.setReplication(filename, (short) 1); + DFSTestUtil.waitForReplication(dfs, filename, (short) 1, 30000); + // The cache locations should drop down to 1 even without cache reports. + waitForCachedBlocks(namenode, 1, (short) 1, "testNoBackingReplica:2"); + } finally { + DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, false); + } + } }