HDFS-3900. QJM: avoid validating log segments on log rolls. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1383041 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ca4582222e
commit
60c20e559b
|
@ -56,3 +56,5 @@ HDFS-3897. QJM: TestBlockToken fails after HDFS-3893. (atm)
|
|||
HDFS-3898. QJM: enable TCP_NODELAY for IPC (todd)
|
||||
|
||||
HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd)
|
||||
|
||||
HDFS-3900. QJM: avoid validating log segments on log rolls (todd)
|
||||
|
|
|
@ -144,17 +144,18 @@ class Journal implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Iterate over the edit logs stored locally, and set
|
||||
* {@link #curSegmentTxId} to refer to the most recently written
|
||||
* one.
|
||||
* Scan the local storage directory, and return the segment containing
|
||||
* the highest transaction.
|
||||
* @return the EditLogFile with the highest transactions, or null
|
||||
* if no files exist.
|
||||
*/
|
||||
private synchronized void scanStorage() throws IOException {
|
||||
private synchronized EditLogFile scanStorageForLatestEdits() throws IOException {
|
||||
if (!fjm.getStorageDirectory().getCurrentDir().exists()) {
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
LOG.info("Scanning storage " + fjm);
|
||||
List<EditLogFile> files = fjm.getLogFiles(0);
|
||||
curSegmentTxId = HdfsConstants.INVALID_TXID;
|
||||
|
||||
while (!files.isEmpty()) {
|
||||
EditLogFile latestLog = files.remove(files.size() - 1);
|
||||
|
@ -166,10 +167,12 @@ class Journal implements Closeable {
|
|||
"moving it aside and looking for previous log");
|
||||
latestLog.moveAsideEmptyFile();
|
||||
} else {
|
||||
curSegmentTxId = latestLog.getFirstTxId();
|
||||
break;
|
||||
return latestLog;
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("No files in " + fjm);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -248,25 +251,30 @@ class Journal implements Closeable {
|
|||
}
|
||||
|
||||
lastPromisedEpoch.set(epoch);
|
||||
if (curSegment != null) {
|
||||
curSegment.close();
|
||||
curSegment = null;
|
||||
curSegmentTxId = HdfsConstants.INVALID_TXID;
|
||||
}
|
||||
abortCurSegment();
|
||||
|
||||
NewEpochResponseProto.Builder builder =
|
||||
NewEpochResponseProto.newBuilder();
|
||||
|
||||
// TODO: we only need to do this once, not on writer switchover.
|
||||
scanStorage();
|
||||
EditLogFile latestFile = scanStorageForLatestEdits();
|
||||
|
||||
if (curSegmentTxId != HdfsConstants.INVALID_TXID) {
|
||||
builder.setLastSegmentTxId(curSegmentTxId);
|
||||
if (latestFile != null) {
|
||||
builder.setLastSegmentTxId(latestFile.getFirstTxId());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private void abortCurSegment() throws IOException {
|
||||
if (curSegment == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
curSegment.abort();
|
||||
curSegment = null;
|
||||
curSegmentTxId = HdfsConstants.INVALID_TXID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a batch of edits to the journal.
|
||||
* {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
|
||||
|
@ -287,11 +295,11 @@ class Journal implements Closeable {
|
|||
// instead of rolling to a new one, which breaks one of the
|
||||
// invariants in the design. If it happens, abort the segment
|
||||
// and throw an exception.
|
||||
curSegment.abort();
|
||||
curSegment = null;
|
||||
throw new IllegalStateException(
|
||||
JournalOutOfSyncException e = new JournalOutOfSyncException(
|
||||
"Writer out of sync: it thinks it is writing segment " + segmentTxId
|
||||
+ " but current segment is " + curSegmentTxId);
|
||||
abortCurSegment();
|
||||
throw e;
|
||||
}
|
||||
|
||||
checkSync(nextTxId == firstTxnId,
|
||||
|
@ -410,8 +418,7 @@ class Journal implements Closeable {
|
|||
// The writer may have lost a connection to us and is now
|
||||
// re-connecting after the connection came back.
|
||||
// We should abort our own old segment.
|
||||
curSegment.abort();
|
||||
curSegment = null;
|
||||
abortCurSegment();
|
||||
}
|
||||
|
||||
// Paranoid sanity check: we should never overwrite a finalized log file.
|
||||
|
@ -459,11 +466,23 @@ class Journal implements Closeable {
|
|||
checkFormatted();
|
||||
checkRequest(reqInfo);
|
||||
|
||||
boolean needsValidation = true;
|
||||
|
||||
// Finalizing the log that the writer was just writing.
|
||||
if (startTxId == curSegmentTxId) {
|
||||
if (curSegment != null) {
|
||||
curSegment.close();
|
||||
curSegment = null;
|
||||
curSegmentTxId = HdfsConstants.INVALID_TXID;
|
||||
}
|
||||
|
||||
checkSync(nextTxId == endTxId + 1,
|
||||
"Trying to finalize in-progress log segment %s to end at " +
|
||||
"txid %s but only written up to txid %s",
|
||||
startTxId, endTxId, nextTxId - 1);
|
||||
// No need to validate the edit log if the client is finalizing
|
||||
// the log segment that it was just writing to.
|
||||
needsValidation = false;
|
||||
}
|
||||
|
||||
FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
|
||||
|
@ -473,15 +492,16 @@ class Journal implements Closeable {
|
|||
}
|
||||
|
||||
if (elf.isInProgress()) {
|
||||
// TODO: this is slow to validate when in non-recovery cases
|
||||
// we already know the length here!
|
||||
|
||||
LOG.info("Validating log about to be finalized: " + elf);
|
||||
elf.validateLog();
|
||||
|
||||
checkSync(elf.getLastTxId() == endTxId,
|
||||
"Trying to finalize log %s-%s, but current state of log " +
|
||||
"is %s", startTxId, endTxId, elf);
|
||||
if (needsValidation) {
|
||||
LOG.info("Validating log segment " + elf.getFile() + " about to be " +
|
||||
"finalized");
|
||||
elf.validateLog();
|
||||
|
||||
checkSync(elf.getLastTxId() == endTxId,
|
||||
"Trying to finalize in-progress log segment %s to end at " +
|
||||
"txid %s but log %s on disk only contains up to txid %s",
|
||||
startTxId, endTxId, elf.getFile(), elf.getLastTxId());
|
||||
}
|
||||
fjm.finalizeLogSegment(startTxId, endTxId);
|
||||
} else {
|
||||
Preconditions.checkArgument(endTxId == elf.getLastTxId(),
|
||||
|
@ -599,6 +619,8 @@ class Journal implements Closeable {
|
|||
checkFormatted();
|
||||
checkRequest(reqInfo);
|
||||
|
||||
abortCurSegment();
|
||||
|
||||
PrepareRecoveryResponseProto.Builder builder =
|
||||
PrepareRecoveryResponseProto.newBuilder();
|
||||
|
||||
|
|
|
@ -222,7 +222,7 @@ public class TestJournal {
|
|||
fail("did not fail to finalize");
|
||||
} catch (JournalOutOfSyncException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"but current state of log is", e);
|
||||
"but only written up to txid 3", e);
|
||||
}
|
||||
|
||||
// Check that, even if we re-construct the journal by scanning the
|
||||
|
@ -235,7 +235,7 @@ public class TestJournal {
|
|||
fail("did not fail to finalize");
|
||||
} catch (JournalOutOfSyncException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"but current state of log is", e);
|
||||
"disk only contains up to txid 3", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue