HDFS-8996. Consolidate validateLog and scanLog in FJM#EditLogFile (Zhe Zhang via Colin P. McCabe)

(cherry picked from commit 53bad4eb00)
This commit is contained in:
Colin Patrick Mccabe 2015-09-14 15:20:51 -07:00
parent c951d56556
commit 2596695654
9 changed files with 58 additions and 131 deletions

View File

@ -558,6 +558,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9027. Refactor o.a.h.hdfs.DataStreamer#isLazyPersist() method. HDFS-9027. Refactor o.a.h.hdfs.DataStreamer#isLazyPersist() method.
(Mingliang Liu via Arpit Agarwal) (Mingliang Liu via Arpit Agarwal)
HDFS-8996. Consolidate validateLog and scanLog in FJM#EditLogFile (Zhe
Zhang via Colin P. McCabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -34,7 +34,7 @@ public class FSEditLogTestUtil {
public static long countTransactionsInStream(EditLogInputStream in) public static long countTransactionsInStream(EditLogInputStream in)
throws IOException { throws IOException {
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.EditLogValidation validation =
FSEditLogLoader.validateEditLog(in, Long.MAX_VALUE); FSEditLogLoader.scanEditLog(in, Long.MAX_VALUE);
return (validation.getEndTxId() - in.getFirstTxId()) + 1; return (validation.getEndTxId() - in.getFirstTxId()) + 1;
} }
} }

View File

@ -192,7 +192,7 @@ public class Journal implements Closeable {
while (!files.isEmpty()) { while (!files.isEmpty()) {
EditLogFile latestLog = files.remove(files.size() - 1); EditLogFile latestLog = files.remove(files.size() - 1);
latestLog.scanLog(); latestLog.scanLog(Long.MAX_VALUE, false);
LOG.info("Latest log is " + latestLog); LOG.info("Latest log is " + latestLog);
if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) { if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
// the log contains no transactions // the log contains no transactions
@ -542,7 +542,7 @@ public class Journal implements Closeable {
// If it's in-progress, it should only contain one transaction, // If it's in-progress, it should only contain one transaction,
// because the "startLogSegment" transaction is written alone at the // because the "startLogSegment" transaction is written alone at the
// start of each segment. // start of each segment.
existing.scanLog(); existing.scanLog(Long.MAX_VALUE, false);
if (existing.getLastTxId() != existing.getFirstTxId()) { if (existing.getLastTxId() != existing.getFirstTxId()) {
throw new IllegalStateException("The log file " + throw new IllegalStateException("The log file " +
existing + " seems to contain valid transactions"); existing + " seems to contain valid transactions");
@ -605,7 +605,7 @@ public class Journal implements Closeable {
if (needsValidation) { if (needsValidation) {
LOG.info("Validating log segment " + elf.getFile() + " about to be " + LOG.info("Validating log segment " + elf.getFile() + " about to be " +
"finalized"); "finalized");
elf.scanLog(); elf.scanLog(Long.MAX_VALUE, false);
checkSync(elf.getLastTxId() == endTxId, checkSync(elf.getLastTxId() == endTxId,
"Trying to finalize in-progress log segment %s to end at " + "Trying to finalize in-progress log segment %s to end at " +
@ -693,7 +693,7 @@ public class Journal implements Closeable {
return null; return null;
} }
if (elf.isInProgress()) { if (elf.isInProgress()) {
elf.scanLog(); elf.scanLog(Long.MAX_VALUE, false);
} }
if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) { if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
LOG.info("Edit log file " + elf + " appears to be empty. " + LOG.info("Edit log file " + elf + " appears to be empty. " +

View File

@ -302,74 +302,31 @@ public class EditLogFileInputStream extends EditLogInputStream {
} }
/** /**
* @param file File being validated. * @param file File being scanned and validated.
* @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation * @param maxTxIdToScan Maximum Tx ID to try to scan.
* returns after reading this or a higher ID. * The scan returns after reading this or a higher
* The file portion beyond this ID is potentially * ID. The file portion beyond this ID is
* being updated. * potentially being updated.
* @return Result of the validation * @return Result of the validation
* @throws IOException * @throws IOException
*/ */
static FSEditLogLoader.EditLogValidation validateEditLog(File file, static FSEditLogLoader.EditLogValidation scanEditLog(File file,
long maxTxIdToValidate) throws IOException { long maxTxIdToScan, boolean verifyVersion)
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
in.getVersion(true); // causes us to read the header
} catch (LogHeaderCorruptException e) {
// If the header is malformed or the wrong value, this indicates a corruption
LOG.warn("Log file " + file + " has no valid header", e);
return new FSEditLogLoader.EditLogValidation(0,
HdfsServerConstants.INVALID_TXID, true);
}
try {
return FSEditLogLoader.validateEditLog(in, maxTxIdToValidate);
} finally {
IOUtils.closeStream(in);
}
}
static FSEditLogLoader.EditLogValidation scanEditLog(File file)
throws IOException { throws IOException {
EditLogFileInputStream in; EditLogFileInputStream in;
try { try {
in = new EditLogFileInputStream(file); in = new EditLogFileInputStream(file);
// read the header, initialize the inputstream, but do not check the // read the header, initialize the inputstream, but do not check the
// layoutversion // layoutversion
in.getVersion(false); in.getVersion(verifyVersion);
} catch (LogHeaderCorruptException e) { } catch (LogHeaderCorruptException e) {
LOG.warn("Log file " + file + " has no valid header", e); LOG.warn("Log file " + file + " has no valid header", e);
return new FSEditLogLoader.EditLogValidation(0, return new FSEditLogLoader.EditLogValidation(0,
HdfsServerConstants.INVALID_TXID, true); HdfsServerConstants.INVALID_TXID, true);
} }
long lastPos = 0;
long lastTxId = HdfsServerConstants.INVALID_TXID;
long numValid = 0;
try { try {
while (true) { return FSEditLogLoader.scanEditLog(in, maxTxIdToScan);
long txid = HdfsServerConstants.INVALID_TXID;
lastPos = in.getPosition();
try {
if ((txid = in.scanNextOp()) == HdfsServerConstants.INVALID_TXID) {
break;
}
} catch (Throwable t) {
FSImage.LOG.warn("Caught exception after scanning through "
+ numValid + " ops from " + in
+ " while determining its valid length. Position was "
+ lastPos, t);
in.resync();
FSImage.LOG.warn("After resync, position is " + in.getPosition());
continue;
}
if (lastTxId == HdfsServerConstants.INVALID_TXID || txid > lastTxId) {
lastTxId = txid;
}
numValid++;
}
return new EditLogValidation(lastPos, lastTxId, false);
} finally { } finally {
IOUtils.closeStream(in); IOUtils.closeStream(in);
} }

View File

@ -1113,70 +1113,41 @@ public class FSEditLogLoader {
/** /**
* Find the last valid transaction ID in the stream. * Find the last valid transaction ID in the stream.
* If there are invalid or corrupt transactions in the middle of the stream, * If there are invalid or corrupt transactions in the middle of the stream,
* validateEditLog will skip over them. * scanEditLog will skip over them.
* This reads through the stream but does not close it. * This reads through the stream but does not close it.
* *
* @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation * @param maxTxIdToScan Maximum Tx ID to try to scan.
* returns after reading this or a higher ID. * The scan returns after reading this or a higher ID.
* The file portion beyond this ID is potentially * The file portion beyond this ID is potentially being
* being updated. * updated.
*/ */
static EditLogValidation validateEditLog(EditLogInputStream in, static EditLogValidation scanEditLog(EditLogInputStream in,
long maxTxIdToValidate) { long maxTxIdToScan) {
long lastPos = 0; long lastPos;
long lastTxId = HdfsServerConstants.INVALID_TXID; long lastTxId = HdfsServerConstants.INVALID_TXID;
long numValid = 0; long numValid = 0;
FSEditLogOp op = null;
while (true) { while (true) {
long txid;
lastPos = in.getPosition(); lastPos = in.getPosition();
try { try {
if ((op = in.readOp()) == null) { if ((txid = in.scanNextOp()) == HdfsServerConstants.INVALID_TXID) {
break; break;
} }
} catch (Throwable t) { } catch (Throwable t) {
FSImage.LOG.warn("Caught exception after reading " + numValid + FSImage.LOG.warn("Caught exception after scanning through "
" ops from " + in + " while determining its valid length." + + numValid + " ops from " + in
"Position was " + lastPos, t); + " while determining its valid length. Position was "
+ lastPos, t);
in.resync(); in.resync();
FSImage.LOG.warn("After resync, position is " + in.getPosition()); FSImage.LOG.warn("After resync, position is " + in.getPosition());
continue; continue;
} }
if (lastTxId == HdfsServerConstants.INVALID_TXID if (lastTxId == HdfsServerConstants.INVALID_TXID || txid > lastTxId) {
|| op.getTransactionId() > lastTxId) { lastTxId = txid;
lastTxId = op.getTransactionId();
} }
if (lastTxId >= maxTxIdToValidate) { if (lastTxId >= maxTxIdToScan) {
break; break;
} }
numValid++;
}
return new EditLogValidation(lastPos, lastTxId, false);
}
static EditLogValidation scanEditLog(EditLogInputStream in) {
long lastPos = 0;
long lastTxId = HdfsServerConstants.INVALID_TXID;
long numValid = 0;
FSEditLogOp op = null;
while (true) {
lastPos = in.getPosition();
try {
if ((op = in.readOp()) == null) { // TODO
break;
}
} catch (Throwable t) {
FSImage.LOG.warn("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length." +
"Position was " + lastPos, t);
in.resync();
FSImage.LOG.warn("After resync, position is " + in.getPosition());
continue;
}
if (lastTxId == HdfsServerConstants.INVALID_TXID
|| op.getTransactionId() > lastTxId) {
lastTxId = op.getTransactionId();
}
numValid++; numValid++;
} }
return new EditLogValidation(lastPos, lastTxId, false); return new EditLogValidation(lastPos, lastTxId, false);

View File

@ -212,7 +212,7 @@ public class FileJournalManager implements JournalManager {
} }
if (elf.isInProgress()) { if (elf.isInProgress()) {
try { try {
elf.validateLog(getLastReadableTxId()); elf.scanLog(getLastReadableTxId(), true);
} catch (IOException e) { } catch (IOException e) {
LOG.error("got IOException while trying to validate header of " + LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e); elf + ". Skipping.", e);
@ -349,8 +349,8 @@ public class FileJournalManager implements JournalManager {
} }
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs, static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
Collection<EditLogInputStream> streams, long fromTxId, long maxTxIdToValidate, Collection<EditLogInputStream> streams, long fromTxId,
boolean inProgressOk) { long maxTxIdToScan, boolean inProgressOk) {
for (EditLogFile elf : elfs) { for (EditLogFile elf : elfs) {
if (elf.isInProgress()) { if (elf.isInProgress()) {
if (!inProgressOk) { if (!inProgressOk) {
@ -361,7 +361,7 @@ public class FileJournalManager implements JournalManager {
continue; continue;
} }
try { try {
elf.validateLog(maxTxIdToValidate); elf.scanLog(maxTxIdToScan, true);
} catch (IOException e) { } catch (IOException e) {
LOG.error("got IOException while trying to validate header of " + LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e); elf + ". Skipping.", e);
@ -405,7 +405,7 @@ public class FileJournalManager implements JournalManager {
continue; continue;
} }
elf.validateLog(getLastReadableTxId()); elf.scanLog(getLastReadableTxId(), true);
if (elf.hasCorruptHeader()) { if (elf.hasCorruptHeader()) {
elf.moveAsideCorruptFile(); elf.moveAsideCorruptFile();
@ -537,20 +537,16 @@ public class FileJournalManager implements JournalManager {
* Find out where the edit log ends. * Find out where the edit log ends.
* This will update the lastTxId of the EditLogFile or * This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is. * mark it as corrupt if it is.
* @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation * @param maxTxIdToScan Maximum Tx ID to try to scan.
* returns after reading this or a higher ID. * The scan returns after reading this or a higher ID.
* The file portion beyond this ID is potentially * The file portion beyond this ID is potentially being
* being updated. * updated.
* @param verifyVersion Whether the scan should verify the layout version
*/ */
public void validateLog(long maxTxIdToValidate) throws IOException { public void scanLog(long maxTxIdToScan, boolean verifyVersion)
EditLogValidation val = EditLogFileInputStream.validateEditLog(file, throws IOException {
maxTxIdToValidate); EditLogValidation val = EditLogFileInputStream.scanEditLog(file,
this.lastTxId = val.getEndTxId(); maxTxIdToScan, verifyVersion);
this.hasCorruptHeader = val.hasCorruptHeader();
}
public void scanLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.scanEditLog(file);
this.lastTxId = val.getEndTxId(); this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader(); this.hasCorruptHeader = val.hasCorruptHeader();
} }

View File

@ -88,7 +88,7 @@ public class TestCheckPointForSecurityTokens {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress()); assertTrue(log.isInProgress());
log.validateLog(Long.MAX_VALUE); log.scanLog(Long.MAX_VALUE, true);
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should have 5 transactions", assertEquals("In-progress log " + log + " should have 5 transactions",
5, numTransactions);; 5, numTransactions);;
@ -105,7 +105,7 @@ public class TestCheckPointForSecurityTokens {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress()); assertTrue(log.isInProgress());
log.validateLog(Long.MAX_VALUE); log.scanLog(Long.MAX_VALUE, true);
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should only have START txn", assertEquals("In-progress log " + log + " should only have START txn",
1, numTransactions); 1, numTransactions);

View File

@ -1229,7 +1229,7 @@ public class TestEditLog {
for (EditLogInputStream edits : editStreams) { for (EditLogInputStream edits : editStreams) {
FSEditLogLoader.EditLogValidation val = FSEditLogLoader.EditLogValidation val =
FSEditLogLoader.validateEditLog(edits, Long.MAX_VALUE); FSEditLogLoader.scanEditLog(edits, Long.MAX_VALUE);
long read = (val.getEndTxId() - edits.getFirstTxId()) + 1; long read = (val.getEndTxId() - edits.getFirstTxId()) + 1;
LOG.info("Loading edits " + edits + " read " + read); LOG.info("Loading edits " + edits + " read " + read);
assertEquals(startTxId, edits.getFirstTxId()); assertEquals(startTxId, edits.getFirstTxId());

View File

@ -319,7 +319,7 @@ public class TestFSEditLogLoader {
rwf.close(); rwf.close();
} }
EditLogValidation validation = EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true);
assertTrue(validation.hasCorruptHeader()); assertTrue(validation.hasCorruptHeader());
} }
@ -334,7 +334,7 @@ public class TestFSEditLogLoader {
File logFileBak = new File(testDir, logFile.getName() + ".bak"); File logFileBak = new File(testDir, logFile.getName() + ".bak");
Files.copy(logFile, logFileBak); Files.copy(logFile, logFileBak);
EditLogValidation validation = EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true);
assertTrue(!validation.hasCorruptHeader()); assertTrue(!validation.hasCorruptHeader());
// We expect that there will be an OP_START_LOG_SEGMENT, followed by // We expect that there will be an OP_START_LOG_SEGMENT, followed by
// NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT. // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
@ -347,8 +347,8 @@ public class TestFSEditLogLoader {
// Restore backup, corrupt the txn opcode // Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset); corruptByteInFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile, validation = EditLogFileInputStream.scanEditLog(logFile,
Long.MAX_VALUE); Long.MAX_VALUE, true);
long expectedEndTxId = (txId == (NUM_TXNS + 1)) ? long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
NUM_TXNS : (NUM_TXNS + 1); NUM_TXNS : (NUM_TXNS + 1);
assertEquals("Failed when corrupting txn opcode at " + txOffset, assertEquals("Failed when corrupting txn opcode at " + txOffset,
@ -365,8 +365,8 @@ public class TestFSEditLogLoader {
// Restore backup, corrupt the txn opcode // Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile); Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset); truncateFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile, validation = EditLogFileInputStream.scanEditLog(logFile,
Long.MAX_VALUE); Long.MAX_VALUE, true);
long expectedEndTxId = (txId == 0) ? long expectedEndTxId = (txId == 0) ?
HdfsServerConstants.INVALID_TXID : (txId - 1); HdfsServerConstants.INVALID_TXID : (txId - 1);
assertEquals("Failed when corrupting txid " + txId + " txn opcode " + assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
@ -384,7 +384,7 @@ public class TestFSEditLogLoader {
// layout flags section. // layout flags section.
truncateFile(logFile, 8); truncateFile(logFile, 8);
EditLogValidation validation = EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE); EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true);
assertTrue(!validation.hasCorruptHeader()); assertTrue(!validation.hasCorruptHeader());
assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId()); assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId());
} }