From 431f685300d4d9f3c52cf0a700427f1231d06e5d Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 24 Jun 2015 14:42:33 -0700 Subject: [PATCH] HDFS-8646. Prune cached replicas from DatanodeDescriptor state on replica invalidation. (cherry picked from commit afe9ea3c12e1f5a71922400eadb642960bc87ca1) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../server/blockmanagement/BlockManager.java | 14 +++++++++++ .../hdfs/server/datanode/BPServiceActor.java | 6 +++-- .../hadoop/hdfs/server/datanode/DataNode.java | 15 ++++++++++- .../hdfs/server/namenode/CacheManager.java | 24 +++++++++++++++--- .../hdfs/server/namenode/FSNamesystem.java | 1 + .../hdfs/server/namenode/Namesystem.java | 18 +++++++------ .../org/apache/hadoop/hdfs/DFSTestUtil.java | 18 +++++++++++++ .../server/datanode/DataNodeTestUtils.java | 11 ++++++++ .../server/namenode/TestCacheDirectives.java | 25 +++++++++++++++++++ 10 files changed, 121 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4402048c19b..27c5ddb8801 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -613,6 +613,9 @@ Release 2.8.0 - UNRELEASED HDFS-8542. WebHDFS getHomeDirectory behavior does not match specification. (Kanaka Kumar Avvaru via jghoman) + HDFS-8546. Prune cached replicas from DatanodeDescriptor state on replica + invalidation. (wang) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 967e3f95c0e..8b685d33011 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -3096,6 +3097,19 @@ public class BlockManager { return; } + CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() + .get(new CachedBlock(block.getBlockId(), (short) 0, false)); + if (cblock != null) { + boolean removed = false; + removed |= node.getPendingCached().remove(cblock); + removed |= node.getCached().remove(cblock); + removed |= node.getPendingUncached().remove(cblock); + if (removed) { + blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " + + "related lists on node {}", block, node); + } + } + // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f84dd997fdf..18174278228 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -709,8 +709,10 @@ class BPServiceActor implements Runnable { } processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); - DatanodeCommand cmd = cacheReport(); - processCommand(new DatanodeCommand[]{ cmd }); + if (!dn.areCacheReportsDisabledForTests()) { + DatanodeCommand cmd = cacheReport(); + processCommand(new DatanodeCommand[]{ cmd }); + } // // There is no work to do; sleep until hearbeat timer elapses, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 21b013d2b90..2a026af6b74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -302,6 +302,7 @@ public class DataNode extends ReconfigurableBase ThreadGroup threadGroup = null; private DNConf dnConf; private volatile boolean heartbeatsDisabledForTests = false; + private volatile boolean cacheReportsDisabledForTests = false; private DataStorage storage = null; private DatanodeHttpServer httpServer = null; @@ -1056,15 +1057,27 @@ public class DataNode extends ReconfigurableBase // used only for testing + @VisibleForTesting void setHeartbeatsDisabledForTests( boolean heartbeatsDisabledForTests) { this.heartbeatsDisabledForTests = heartbeatsDisabledForTests; } - + + @VisibleForTesting boolean areHeartbeatsDisabledForTests() { return this.heartbeatsDisabledForTests; } + @VisibleForTesting + void setCacheReportsDisabledForTest(boolean disabled) { + this.cacheReportsDisabledForTests = disabled; + } + + @VisibleForTesting + boolean areCacheReportsDisabledForTests() { + return this.cacheReportsDisabledForTests; + } + /** * This method starts the data node with the specified conf. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index e5270adc48a..e09ba32be6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; @@ -902,9 +903,26 @@ public final class CacheManager { if (cachedBlock == null) { return; } - List datanodes = cachedBlock.getDatanodes(Type.CACHED); - for (DatanodeDescriptor datanode : datanodes) { - block.addCachedLoc(datanode); + List cachedDNs = cachedBlock.getDatanodes(Type.CACHED); + for (DatanodeDescriptor datanode : cachedDNs) { + // Filter out cached blocks that do not have a backing replica. + // + // This should not happen since it means the CacheManager thinks + // something is cached that does not exist, but it's a safety + // measure. + boolean found = false; + for (DatanodeInfo loc : block.getLocations()) { + if (loc.equals(datanode)) { + block.addCachedLoc(loc); + found = true; + break; + } + } + if (!found) { + LOG.warn("Datanode {} is not a valid cache location for block {} " + + "because that node does not have a backing replica!", + datanode, block.getBlock().getBlockName()); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 98a148c4f3a..80d018e2db5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -6455,6 +6455,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, this.dir = dir; } /** @return the cache manager. */ + @Override public CacheManager getCacheManager() { return cacheManager; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index 40c4765f91c..1732865a712 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -29,21 +29,23 @@ import org.apache.hadoop.security.AccessControlException; @InterfaceAudience.Private public interface Namesystem extends RwLock, SafeMode { /** Is this name system running? */ - public boolean isRunning(); + boolean isRunning(); /** Check if the user has superuser privilege. */ - public void checkSuperuserPrivilege() throws AccessControlException; + void checkSuperuserPrivilege() throws AccessControlException; /** @return the block pool ID */ - public String getBlockPoolId(); + String getBlockPoolId(); - public boolean isInStandbyState(); + boolean isInStandbyState(); - public boolean isGenStampInFuture(Block block); + boolean isGenStampInFuture(Block block); - public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); + void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); - public void checkOperation(OperationCategory read) throws StandbyException; + void checkOperation(OperationCategory read) throws StandbyException; - public boolean isInSnapshot(BlockInfoUnderConstruction blockUC); + boolean isInSnapshot(BlockInfoUnderConstruction blockUC); + + CacheManager getCacheManager(); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 1011913e695..009babf1ab4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -79,6 +79,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FsShell; @@ -540,6 +541,23 @@ public class DFSTestUtil { } } + public static void waitForReplication(final DistributedFileSystem dfs, + final Path file, final short replication, int waitForMillis) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + FileStatus stat = dfs.getFileStatus(file); + return replication == stat.getReplication(); + } catch (IOException e) { + LOG.info("getFileStatus on path " + file + " failed!", e); + return false; + } + } + }, 100, waitForMillis); + } + /** * Keep accessing the given file until the namenode reports that the * given block in the file contains the given number of corrupt replicas. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 9dee7249952..2f9a3e5d93f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -53,6 +54,16 @@ public class DataNodeTestUtils { dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests); } + /** + * Set if cache reports are disabled for all DNs in a mini cluster. + */ + public static void setCacheReportsDisabledForTests(MiniDFSCluster cluster, + boolean disabled) { + for (DataNode dn : cluster.getDataNodes()) { + dn.setCacheReportsDisabledForTest(disabled); + } + } + public static void triggerDeletionReport(DataNode dn) throws IOException { for (BPOfferService bpos : dn.getAllBpOs()) { bpos.triggerDeletionReportForTests(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 3617ee3ca3a..e56929381b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; @@ -1453,4 +1454,28 @@ public class TestCacheDirectives { Thread.sleep(1000); checkPendingCachedEmpty(cluster); } + + @Test(timeout=60000) + public void testNoBackingReplica() throws Exception { + // Cache all three replicas for a file. + final Path filename = new Path("/noback"); + final short replication = (short) 3; + DFSTestUtil.createFile(dfs, filename, 1, replication, 0x0BAC); + dfs.addCachePool(new CachePoolInfo("pool")); + dfs.addCacheDirective( + new CacheDirectiveInfo.Builder().setPool("pool").setPath(filename) + .setReplication(replication).build()); + waitForCachedBlocks(namenode, 1, replication, "testNoBackingReplica:1"); + // Pause cache reports while we change the replication factor. + // This will orphan some cached replicas. + DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, true); + try { + dfs.setReplication(filename, (short) 1); + DFSTestUtil.waitForReplication(dfs, filename, (short) 1, 30000); + // The cache locations should drop down to 1 even without cache reports. + waitForCachedBlocks(namenode, 1, (short) 1, "testNoBackingReplica:2"); + } finally { + DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, false); + } + } }