diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt index f40d2fd1c8a..aac215cf138 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt @@ -18,3 +18,7 @@ HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal) + HDFS-6929. NN periodically unlinks lazy persist files with missing + replicas from namespace. (Arpit Agarwal) + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8054b79e9db..a94d5e3c6b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -62,6 +62,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROL import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; @@ -441,6 +443,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, Daemon nnrmthread = null; // NamenodeResourceMonitor thread Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread + + // A daemon to periodically clean up corrupt lazyPersist files + // from the name space. + Daemon lazyPersistFileScrubber = null; /** * When an active namenode will roll its own edit log, in # edits */ @@ -450,6 +456,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ private final int editLogRollerInterval; + /** + * How frequently we scan and unlink corrupt lazyPersist files. + * (In seconds) + */ + private final int lazyPersistFileScrubIntervalSec; + private volatile boolean hasResourcesAvailable = false; private volatile boolean fsRunning = true; @@ -857,6 +869,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats, DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT); this.inodeId = new INodeId(); + this.lazyPersistFileScrubIntervalSec = conf.getInt( + DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, + DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT); + + if (this.lazyPersistFileScrubIntervalSec == 0) { + throw new IllegalArgumentException( + DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero."); + } + // For testing purposes, allow the DT secret manager to be started regardless // of whether security is enabled. alwaysUseDelegationTokensForTests = conf.getBoolean( @@ -930,7 +951,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, @VisibleForTesting static RetryCache initRetryCache(Configuration conf) { boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, - DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT); + DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT); LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled")); if (enable) { float heapPercent = conf.getFloat( @@ -1159,6 +1180,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, editLogRollerThreshold, editLogRollerInterval)); nnEditLogRoller.start(); + if (lazyPersistFileScrubIntervalSec > 0) { + lazyPersistFileScrubber = new Daemon(new LazyPersistFileScrubber( + lazyPersistFileScrubIntervalSec)); + lazyPersistFileScrubber.start(); + } + cacheManager.startMonitorThread(); blockManager.getDatanodeManager().setShouldSendCachingCommands(true); } finally { @@ -1211,6 +1238,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, ((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop(); nnEditLogRoller.interrupt(); } + if (lazyPersistFileScrubber != null) { + ((LazyPersistFileScrubber) lazyPersistFileScrubber.getRunnable()).stop(); + lazyPersistFileScrubber.interrupt(); + } if (dir != null && getFSImage() != null) { if (getFSImage().editLog != null) { getFSImage().editLog.close(); @@ -2683,6 +2714,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + src + " for client " + clientMachine); } INodeFile myFile = INodeFile.valueOf(inode, src, true); + + if (myFile.getLazyPersistFlag()) { + throw new UnsupportedOperationException( + "Cannot append to lazy persist file " + src); + } // Opening an existing file for write - may need to recover lease. recoverLeaseInternal(myFile, src, holder, clientMachine, false); @@ -5001,6 +5037,71 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } + /** + * Daemon to periodically scan the namespace for lazyPersist files + * with missing blocks and unlink them. + */ + class LazyPersistFileScrubber implements Runnable { + private volatile boolean shouldRun = true; + final int scrubIntervalSec; + public LazyPersistFileScrubber(final int scrubIntervalSec) { + this.scrubIntervalSec = scrubIntervalSec; + } + + /** + * Periodically go over the list of lazyPersist files with missing + * blocks and unlink them from the namespace. + */ + private void clearCorruptLazyPersistFiles() + throws SafeModeException, AccessControlException, + UnresolvedLinkException, IOException { + + List filesToDelete = new ArrayList(); + + writeLock(); + + try { + final Iterator it = blockManager.getCorruptReplicaBlockIterator(); + + while (it.hasNext()) { + Block b = it.next(); + BlockInfo blockInfo = blockManager.getStoredBlock(b); + if (blockInfo.getBlockCollection().getLazyPersistFlag()) { + filesToDelete.add(blockInfo.getBlockCollection()); + } + } + + for (BlockCollection bc : filesToDelete) { + LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas."); + deleteInternal(bc.getName(), false, false, false); + } + } finally { + writeUnlock(); + } + } + + @Override + public void run() { + while (fsRunning && shouldRun) { + try { + clearCorruptLazyPersistFiles(); + Thread.sleep(scrubIntervalSec * 1000); + } catch (InterruptedException e) { + FSNamesystem.LOG.info( + "LazyPersistFileScrubber was interrupted, exiting"); + break; + } catch (Exception e) { + FSNamesystem.LOG.error( + "Ignoring exception in LazyPersistFileScrubber:", e); + } + } + } + + public void stop() { + shouldRun = false; + } + } + public FSImage getFSImage() { return fsImage; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 201560d0ccd..ed1443c68c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -397,6 +397,16 @@ + + dfs.namenode.lazypersist.file.scrub.interval.sec + 300 + + The NameNode periodically scans the namespace for LazyPersist files with + missing blocks and unlinks them from the namespace. This configuration key + controls the interval between successive scans. Set it to a negative value + to disable this behavior. + + dfs.block.access.token.enable false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 6e7c091b04f..ddd71b1b5e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -204,6 +204,52 @@ public class TestLazyPersistFiles { } } + /** + * If one or more replicas of a lazyPersist file are lost, then the file + * must be discarded by the NN, instead of being kept around as a + * 'corrupt' file. + */ + @Test (timeout=300000) + public void testLazyPersistFilesAreDiscarded() + throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, + new StorageType[] {RAM_DISK, DEFAULT }, + (2 * BLOCK_SIZE - 1)); // 1 replica + delta. + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + + makeTestFile(path1, BLOCK_SIZE, true); + makeTestFile(path2, BLOCK_SIZE, false); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + ensureFileReplicasOnStorageType(path2, DEFAULT); + + // Stop the DataNode and sleep for the time it takes the NN to + // detect the DN as being dead. + cluster.shutdownDataNodes(); + Thread.sleep(30000L); + assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1)); + + // Next, wait for the replication monitor to mark the file as + // corrupt, plus some delta. + Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000); + + // Wait for the LazyPersistFileScrubber to run, plus some delta. + Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); + + // Ensure that path1 does not exist anymore, whereas path2 does. + assert(!fs.exists(path1)); + assert(fs.exists(path2)); + + // We should have only one block that needs replication i.e. the one + // belonging to path2. + assertThat(cluster.getNameNode() + .getNamesystem() + .getBlockManager() + .getUnderReplicatedBlocksCount(), + is(1L)); + } + @Test (timeout=300000) public void testLazyPersistBlocksAreSaved() throws IOException, InterruptedException {