diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index b0fe60a77bb..82e35bd353e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -138,7 +138,7 @@ public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) { long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) throws IOException { - return loadFSEdits(edits, expectedStartingTxId, null, null); + return loadFSEdits(edits, expectedStartingTxId, Long.MAX_VALUE, null, null); } /** @@ -147,6 +147,7 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) * along. */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, + long maxTxnsToRead, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); @@ -154,9 +155,10 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, fsNamesys.writeLock(); try { long startTime = monotonicNow(); - FSImage.LOG.info("Start loading edits file " + edits.getName()); + FSImage.LOG.info("Start loading edits file " + edits.getName() + + " maxTxnsToRead = " + maxTxnsToRead); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, - startOpt, recovery); + maxTxnsToRead, startOpt, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); @@ -171,8 +173,13 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, long loadEditRecords(EditLogInputStream in, boolean closeOnExit, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { - FSDirectory fsDir = fsNamesys.dir; + return loadEditRecords(in, closeOnExit, expectedStartingTxId, + Long.MAX_VALUE, startOpt, recovery); + } + long loadEditRecords(EditLogInputStream in, boolean closeOnExit, + long expectedStartingTxId, long maxTxnsToRead, StartupOption startOpt, + MetaRecoveryContext recovery) throws IOException { EnumMap> opCounts = new EnumMap>(FSEditLogOpCodes.class); @@ -181,6 +188,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, } fsNamesys.writeLock(); + FSDirectory fsDir = fsNamesys.dir; fsDir.writeLock(); long recentOpcodeOffsets[] = new long[4]; @@ -285,6 +293,9 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, } numEdits++; totalEdits++; + if(numEdits >= maxTxnsToRead) { + break; + } } catch (RollingUpgradeOp.RollbackException e) { LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback."); break; @@ -308,7 +319,11 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit, if (FSImage.LOG.isDebugEnabled()) { dumpOpCounts(opCounts); + FSImage.LOG.debug("maxTxnsToRead = " + maxTxnsToRead + + " actual edits read = " + numEdits); } + assert numEdits <= maxTxnsToRead || numEdits == 1 : + "should read at least one txn, but not more than the configured max"; } return numEdits; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index dd7df5ad6b4..5cfc0176f1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -742,7 +742,8 @@ LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { prog.endPhase(Phase.LOADING_FSIMAGE); if (!rollingRollback) { - long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery); + long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE, + startOpt, recovery); needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), txnsAdvanced); } else { @@ -866,11 +867,12 @@ private boolean needsResaveBasedOnStaleCheckpoint( */ public long loadEdits(Iterable editStreams, FSNamesystem target) throws IOException { - return loadEdits(editStreams, target, null, null); + return loadEdits(editStreams, target, Long.MAX_VALUE, null, null); } - private long loadEdits(Iterable editStreams, - FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery) + public long loadEdits(Iterable editStreams, + FSNamesystem target, long maxTxnsToRead, + StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); StartupProgress prog = NameNode.getStartupProgress(); @@ -885,14 +887,16 @@ private long loadEdits(Iterable editStreams, LOG.info("Reading " + editIn + " expecting start txid #" + (lastAppliedTxId + 1)); try { - loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery); + loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead, + startOpt, recovery); } finally { // Update lastAppliedTxId even in case of error, since some ops may // have been successfully applied before the error. lastAppliedTxId = loader.getLastAppliedTxId(); } // If we are in recovery mode, we may have skipped over some txids. - if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID) { + if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID + && recovery != null) { lastAppliedTxId = editIn.getLastTxId(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index f57cb4bd939..73a111ea6c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -73,7 +73,19 @@ @InterfaceStability.Evolving public class EditLogTailer { public static final Log LOG = LogFactory.getLog(EditLogTailer.class); - + + /** + * StandbyNode will hold namesystem lock to apply at most this many journal + * transactions. + * It will then release the lock and re-acquire it to load more transactions. + * By default the write lock is held for the entire journal segment. + * Fine-grained locking allows read requests to get through. + */ + public static final String DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY = + "dfs.ha.tail-edits.max-txns-per-lock"; + public static final long DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT = + Long.MAX_VALUE; + private final EditLogTailerThread tailerThread; private final Configuration conf; @@ -138,6 +150,12 @@ public class EditLogTailer { */ private final boolean inProgressOk; + /** + * Release the namesystem lock after loading this many transactions. + * Then re-acquire the lock to load more edits. + */ + private final long maxTxnsPerLock; + public EditLogTailer(FSNamesystem namesystem, Configuration conf) { this.tailerThread = new EditLogTailerThread(); this.conf = conf; @@ -198,6 +216,10 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) { DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); + this.maxTxnsPerLock = conf.getLong( + DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY, + DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT); + nnCount = nns.size(); // setup the iterator to endlessly loop the nns this.nnLookup = Iterators.cycle(nns); @@ -290,7 +312,8 @@ void doTailEdits() throws IOException, InterruptedException { // disk are ignored. long editsLoaded = 0; try { - editsLoaded = image.loadEdits(streams, namesystem); + editsLoaded = image.loadEdits( + streams, namesystem, maxTxnsPerLock, null, null); } catch (EditLogInputException elie) { editsLoaded = elie.getNumEditsLoaded(); throw elie; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 61f890c549b..c88ac57c27f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -98,8 +98,9 @@ private static Configuration getConf() { public void testTailer() throws IOException, InterruptedException, ServiceFailedException { Configuration conf = getConf(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 0); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100); + conf.setLong(EditLogTailer.DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY, 3); HAUtil.setAllowStandbyReads(conf, true); @@ -121,7 +122,10 @@ public void testTailer() throws IOException, InterruptedException, } HATestUtil.waitForStandbyToCatchUp(nn1, nn2); - + assertEquals("Inconsistent number of applied txns on Standby", + nn1.getNamesystem().getEditLog().getLastWrittenTxId(), + nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1); + for (int i = 0; i < DIRS_TO_MAKE / 2; i++) { assertTrue(NameNodeAdapter.getFileInfo(nn2, getDirPath(i), false, false, false).isDirectory()); @@ -134,7 +138,10 @@ public void testTailer() throws IOException, InterruptedException, } HATestUtil.waitForStandbyToCatchUp(nn1, nn2); - + assertEquals("Inconsistent number of applied txns on Standby", + nn1.getNamesystem().getEditLog().getLastWrittenTxId(), + nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1); + for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) { assertTrue(NameNodeAdapter.getFileInfo(nn2, getDirPath(i), false, false, false).isDirectory());