From 53bad4eb008ec553dcdbe01e7ae975dcecde6590 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe <cmccabe@cloudera.com> Date: Mon, 14 Sep 2015 15:20:51 -0700 Subject: [PATCH] HDFS-8996. Consolidate validateLog and scanLog in FJM#EditLogFile (Zhe Zhang via Colin P. McCabe) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/namenode/FSEditLogTestUtil.java | 2 +- .../hadoop/hdfs/qjournal/server/Journal.java | 8 +-- .../namenode/EditLogFileInputStream.java | 61 +++--------------- .../hdfs/server/namenode/FSEditLogLoader.java | 63 +++++-------------- .../server/namenode/FileJournalManager.java | 32 +++++----- .../TestCheckPointForSecurityTokens.java | 4 +- .../hdfs/server/namenode/TestEditLog.java | 2 +- .../server/namenode/TestFSEditLogLoader.java | 14 ++--- 9 files changed, 58 insertions(+), 131 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cb0fae97afa..1b21c4ddcd4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -906,6 +906,9 @@ Release 2.8.0 - UNRELEASED HDFS-9027. Refactor o.a.h.hdfs.DataStreamer#isLazyPersist() method. (Mingliang Liu via Arpit Agarwal) + HDFS-8996. Consolidate validateLog and scanLog in FJM#EditLogFile (Zhe + Zhang via Colin P. McCabe) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java index e5b9d01bd9e..7a7af06f04c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java @@ -34,7 +34,7 @@ public static FSEditLogOp getNoOpInstance() { public static long countTransactionsInStream(EditLogInputStream in) throws IOException { FSEditLogLoader.EditLogValidation validation = - FSEditLogLoader.validateEditLog(in, Long.MAX_VALUE); + FSEditLogLoader.scanEditLog(in, Long.MAX_VALUE); return (validation.getEndTxId() - in.getFirstTxId()) + 1; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index b94cd8caa46..de052c6ebb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -192,7 +192,7 @@ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException while (!files.isEmpty()) { EditLogFile latestLog = files.remove(files.size() - 1); - latestLog.scanLog(); + latestLog.scanLog(Long.MAX_VALUE, false); LOG.info("Latest log is " + latestLog); if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) { // the log contains no transactions @@ -542,7 +542,7 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid, // If it's in-progress, it should only contain one transaction, // because the "startLogSegment" transaction is written alone at the // start of each segment. - existing.scanLog(); + existing.scanLog(Long.MAX_VALUE, false); if (existing.getLastTxId() != existing.getFirstTxId()) { throw new IllegalStateException("The log file " + existing + " seems to contain valid transactions"); @@ -605,7 +605,7 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + "finalized"); - elf.scanLog(); + elf.scanLog(Long.MAX_VALUE, false); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + @@ -693,7 +693,7 @@ SegmentStateProto getSegmentInfo(long segmentTxId) return null; } if (elf.isInProgress()) { - elf.scanLog(); + elf.scanLog(Long.MAX_VALUE, false); } if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 3bf0ab4ad6f..48df8d67159 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -301,74 +301,31 @@ public String toString() { } /** - * @param file File being validated. - * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation - * returns after reading this or a higher ID. - * The file portion beyond this ID is potentially - * being updated. + * @param file File being scanned and validated. + * @param maxTxIdToScan Maximum Tx ID to try to scan. + * The scan returns after reading this or a higher + * ID. The file portion beyond this ID is + * potentially being updated. * @return Result of the validation * @throws IOException */ - static FSEditLogLoader.EditLogValidation validateEditLog(File file, - long maxTxIdToValidate) throws IOException { - EditLogFileInputStream in; - try { - in = new EditLogFileInputStream(file); - in.getVersion(true); // causes us to read the header - } catch (LogHeaderCorruptException e) { - // If the header is malformed or the wrong value, this indicates a corruption - LOG.warn("Log file " + file + " has no valid header", e); - return new FSEditLogLoader.EditLogValidation(0, - HdfsServerConstants.INVALID_TXID, true); - } - - try { - return FSEditLogLoader.validateEditLog(in, maxTxIdToValidate); - } finally { - IOUtils.closeStream(in); - } - } - - static FSEditLogLoader.EditLogValidation scanEditLog(File file) + static FSEditLogLoader.EditLogValidation scanEditLog(File file, + long maxTxIdToScan, boolean verifyVersion) throws IOException { EditLogFileInputStream in; try { in = new EditLogFileInputStream(file); // read the header, initialize the inputstream, but do not check the // layoutversion - in.getVersion(false); + in.getVersion(verifyVersion); } catch (LogHeaderCorruptException e) { LOG.warn("Log file " + file + " has no valid header", e); return new FSEditLogLoader.EditLogValidation(0, HdfsServerConstants.INVALID_TXID, true); } - long lastPos = 0; - long lastTxId = HdfsServerConstants.INVALID_TXID; - long numValid = 0; try { - while (true) { - long txid = HdfsServerConstants.INVALID_TXID; - lastPos = in.getPosition(); - try { - if ((txid = in.scanNextOp()) == HdfsServerConstants.INVALID_TXID) { - break; - } - } catch (Throwable t) { - FSImage.LOG.warn("Caught exception after scanning through " - + numValid + " ops from " + in - + " while determining its valid length. Position was " - + lastPos, t); - in.resync(); - FSImage.LOG.warn("After resync, position is " + in.getPosition()); - continue; - } - if (lastTxId == HdfsServerConstants.INVALID_TXID || txid > lastTxId) { - lastTxId = txid; - } - numValid++; - } - return new EditLogValidation(lastPos, lastTxId, false); + return FSEditLogLoader.scanEditLog(in, maxTxIdToScan); } finally { IOUtils.closeStream(in); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index bb36ca22cd4..c2cccb56c10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -1110,70 +1110,41 @@ private void check203UpgradeFailure(int logVersion, Throwable e) /** * Find the last valid transaction ID in the stream. * If there are invalid or corrupt transactions in the middle of the stream, - * validateEditLog will skip over them. + * scanEditLog will skip over them. * This reads through the stream but does not close it. * - * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation - * returns after reading this or a higher ID. - * The file portion beyond this ID is potentially - * being updated. + * @param maxTxIdToScan Maximum Tx ID to try to scan. + * The scan returns after reading this or a higher ID. + * The file portion beyond this ID is potentially being + * updated. */ - static EditLogValidation validateEditLog(EditLogInputStream in, - long maxTxIdToValidate) { - long lastPos = 0; + static EditLogValidation scanEditLog(EditLogInputStream in, + long maxTxIdToScan) { + long lastPos; long lastTxId = HdfsServerConstants.INVALID_TXID; long numValid = 0; - FSEditLogOp op = null; while (true) { + long txid; lastPos = in.getPosition(); try { - if ((op = in.readOp()) == null) { + if ((txid = in.scanNextOp()) == HdfsServerConstants.INVALID_TXID) { break; } } catch (Throwable t) { - FSImage.LOG.warn("Caught exception after reading " + numValid + - " ops from " + in + " while determining its valid length." + - "Position was " + lastPos, t); + FSImage.LOG.warn("Caught exception after scanning through " + + numValid + " ops from " + in + + " while determining its valid length. Position was " + + lastPos, t); in.resync(); FSImage.LOG.warn("After resync, position is " + in.getPosition()); continue; } - if (lastTxId == HdfsServerConstants.INVALID_TXID - || op.getTransactionId() > lastTxId) { - lastTxId = op.getTransactionId(); + if (lastTxId == HdfsServerConstants.INVALID_TXID || txid > lastTxId) { + lastTxId = txid; } - if (lastTxId >= maxTxIdToValidate) { + if (lastTxId >= maxTxIdToScan) { break; } - - numValid++; - } - return new EditLogValidation(lastPos, lastTxId, false); - } - - static EditLogValidation scanEditLog(EditLogInputStream in) { - long lastPos = 0; - long lastTxId = HdfsServerConstants.INVALID_TXID; - long numValid = 0; - FSEditLogOp op = null; - while (true) { - lastPos = in.getPosition(); - try { - if ((op = in.readOp()) == null) { // TODO - break; - } - } catch (Throwable t) { - FSImage.LOG.warn("Caught exception after reading " + numValid + - " ops from " + in + " while determining its valid length." + - "Position was " + lastPos, t); - in.resync(); - FSImage.LOG.warn("After resync, position is " + in.getPosition()); - continue; - } - if (lastTxId == HdfsServerConstants.INVALID_TXID - || op.getTransactionId() > lastTxId) { - lastTxId = op.getTransactionId(); - } numValid++; } return new EditLogValidation(lastPos, lastTxId, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index a1488eb09bf..ff6376eee5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -211,7 +211,7 @@ public List<RemoteEditLog> getRemoteEditLogs(long firstTxId, } if (elf.isInProgress()) { try { - elf.validateLog(getLastReadableTxId()); + elf.scanLog(getLastReadableTxId(), true); } catch (IOException e) { LOG.error("got IOException while trying to validate header of " + elf + ". Skipping.", e); @@ -348,8 +348,8 @@ synchronized public void selectInputStreams( } static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs, - Collection<EditLogInputStream> streams, long fromTxId, long maxTxIdToValidate, - boolean inProgressOk) { + Collection<EditLogInputStream> streams, long fromTxId, + long maxTxIdToScan, boolean inProgressOk) { for (EditLogFile elf : elfs) { if (elf.isInProgress()) { if (!inProgressOk) { @@ -360,7 +360,7 @@ static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs, continue; } try { - elf.validateLog(maxTxIdToValidate); + elf.scanLog(maxTxIdToScan, true); } catch (IOException e) { LOG.error("got IOException while trying to validate header of " + elf + ". Skipping.", e); @@ -404,7 +404,7 @@ synchronized public void recoverUnfinalizedSegments() throws IOException { continue; } - elf.validateLog(getLastReadableTxId()); + elf.scanLog(getLastReadableTxId(), true); if (elf.hasCorruptHeader()) { elf.moveAsideCorruptFile(); @@ -536,20 +536,16 @@ boolean containsTxId(long txId) { * Find out where the edit log ends. * This will update the lastTxId of the EditLogFile or * mark it as corrupt if it is. - * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation - * returns after reading this or a higher ID. - * The file portion beyond this ID is potentially - * being updated. + * @param maxTxIdToScan Maximum Tx ID to try to scan. + * The scan returns after reading this or a higher ID. + * The file portion beyond this ID is potentially being + * updated. + * @param verifyVersion Whether the scan should verify the layout version */ - public void validateLog(long maxTxIdToValidate) throws IOException { - EditLogValidation val = EditLogFileInputStream.validateEditLog(file, - maxTxIdToValidate); - this.lastTxId = val.getEndTxId(); - this.hasCorruptHeader = val.hasCorruptHeader(); - } - - public void scanLog() throws IOException { - EditLogValidation val = EditLogFileInputStream.scanEditLog(file); + public void scanLog(long maxTxIdToScan, boolean verifyVersion) + throws IOException { + EditLogValidation val = EditLogFileInputStream.scanEditLog(file, + maxTxIdToScan, verifyVersion); this.lastTxId = val.getEndTxId(); this.hasCorruptHeader = val.hasCorruptHeader(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java index d5e64ae5e55..cff4e1f32f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java @@ -88,7 +88,7 @@ public void testSaveNamespace() throws IOException { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); - log.validateLog(Long.MAX_VALUE); + log.scanLog(Long.MAX_VALUE, true); long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; assertEquals("In-progress log " + log + " should have 5 transactions", 5, numTransactions);; @@ -105,7 +105,7 @@ public void testSaveNamespace() throws IOException { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); - log.validateLog(Long.MAX_VALUE); + log.scanLog(Long.MAX_VALUE, true); long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; assertEquals("In-progress log " + log + " should only have START txn", 1, numTransactions); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 0495860cba5..7bb39a5e687 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -1229,7 +1229,7 @@ public void testAlternatingJournalFailure() throws IOException { for (EditLogInputStream edits : editStreams) { FSEditLogLoader.EditLogValidation val = - FSEditLogLoader.validateEditLog(edits, Long.MAX_VALUE); + FSEditLogLoader.scanEditLog(edits, Long.MAX_VALUE); long read = (val.getEndTxId() - edits.getFirstTxId()) + 1; LOG.info("Loading edits " + edits + " read " + read); assertEquals(startTxId, edits.getFirstTxId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 3c3423a5917..47a60b0e2c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -319,7 +319,7 @@ public void testValidateEditLogWithCorruptHeader() throws IOException { rwf.close(); } EditLogValidation validation = - EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); + EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true); assertTrue(validation.hasCorruptHeader()); } @@ -334,7 +334,7 @@ public void testValidateEditLogWithCorruptBody() throws IOException { File logFileBak = new File(testDir, logFile.getName() + ".bak"); Files.copy(logFile, logFileBak); EditLogValidation validation = - EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); + EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true); assertTrue(!validation.hasCorruptHeader()); // We expect that there will be an OP_START_LOG_SEGMENT, followed by // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT. @@ -347,8 +347,8 @@ public void testValidateEditLogWithCorruptBody() throws IOException { // Restore backup, corrupt the txn opcode Files.copy(logFileBak, logFile); corruptByteInFile(logFile, txOffset); - validation = EditLogFileInputStream.validateEditLog(logFile, - Long.MAX_VALUE); + validation = EditLogFileInputStream.scanEditLog(logFile, + Long.MAX_VALUE, true); long expectedEndTxId = (txId == (NUM_TXNS + 1)) ? NUM_TXNS : (NUM_TXNS + 1); assertEquals("Failed when corrupting txn opcode at " + txOffset, @@ -365,8 +365,8 @@ public void testValidateEditLogWithCorruptBody() throws IOException { // Restore backup, corrupt the txn opcode Files.copy(logFileBak, logFile); truncateFile(logFile, txOffset); - validation = EditLogFileInputStream.validateEditLog(logFile, - Long.MAX_VALUE); + validation = EditLogFileInputStream.scanEditLog(logFile, + Long.MAX_VALUE, true); long expectedEndTxId = (txId == 0) ? HdfsServerConstants.INVALID_TXID : (txId - 1); assertEquals("Failed when corrupting txid " + txId + " txn opcode " + @@ -384,7 +384,7 @@ public void testValidateEmptyEditLog() throws IOException { // layout flags section. truncateFile(logFile, 8); EditLogValidation validation = - EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); + EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true); assertTrue(!validation.hasCorruptHeader()); assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId()); }