HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)

Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
Duo Zhang 2023-03-15 21:47:25 +08:00 committed by GitHub
parent c8bee238fc
commit 1f2e1f5b3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 120 additions and 98 deletions

View File

@ -206,23 +206,52 @@ class WALEntryStream implements Closeable {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380") justification = "HDFS-4380")
private HasNext tryAdvanceEntry() { private HasNext prepareReader() {
if (reader == null) { if (reader != null) {
if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading
LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time
return HasNext.RETRY;
}
} else {
return HasNext.YES;
}
}
// try open next WAL file // try open next WAL file
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek(); Path nextPath = queue.peek();
if (nextPath != null) { if (nextPath == null) {
LOG.debug("No more WAL files in queue");
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
}
setCurrentPath(nextPath); setCurrentPath(nextPath);
// we need to test this prior to create the reader. If not, it is possible that, while // we need to test this prior to create the reader. If not, it is possible that, while
// opening the file, the file is still being written so its header is incomplete and we get // opening the file, the file is still being written so its header is incomplete and we get
// a header EOF, but then while we test whether it is still being written, we have already // a header EOF, but then while we test whether it is still being written, we have already
// flushed the data out and we consider it is not being written, and then we just skip over // flushed the data out and we consider it is not being written, and then we just skip over
// file, then we will lose the data written after opening... // file, then we will lose the data written after opening...
boolean beingWritten = boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent(); LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
currentPositionOfEntry, beingWritten);
try { try {
reader = WALFactory.createTailingReader(fs, nextPath, conf, reader = WALFactory.createTailingReader(fs, nextPath, conf,
currentPositionOfEntry > 0 ? currentPositionOfEntry : -1); currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
return HasNext.YES;
} catch (WALHeaderEOFException e) { } catch (WALHeaderEOFException e) {
if (!eofAutoRecovery) { if (!eofAutoRecovery) {
// if we do not enable EOF auto recovery, just let the upper layer retry // if we do not enable EOF auto recovery, just let the upper layer retry
@ -232,7 +261,8 @@ class WALEntryStream implements Closeable {
// we hit EOF while reading the WAL header, usually this means we can just skip over this // we hit EOF while reading the WAL header, usually this means we can just skip over this
// file, but we need to be careful that whether this file is still being written, if so we // file, but we need to be careful that whether this file is still being written, if so we
// should retry instead of skipping. // should retry instead of skipping.
LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e); LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
currentPositionOfEntry, e);
if (beingWritten) { if (beingWritten) {
// just retry as the file is still being written, maybe next time we could read // just retry as the file is still being written, maybe next time we could read
// something // something
@ -252,27 +282,51 @@ class WALEntryStream implements Closeable {
LOG.warn("Failed to open WAL reader for path: {}", nextPath, e); LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
return HasNext.RETRY; return HasNext.RETRY;
} }
} else {
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
} }
} else if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading private HasNext lastAttempt() {
LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try { try {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression()); reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e); currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time // just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY; return HasNext.RETRY;
} }
Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
// should not be written
assert !pair.getSecond();
if (!state.eof()) {
// we still have something to read after reopen, so return YES. Or there are something wrong
// and we need to retry
return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
}
// No data available after reopen
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}
private HasNext tryAdvanceEntry() {
HasNext prepared = prepareReader();
if (prepared != HasNext.YES) {
return prepared;
} }
Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition(); Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
@ -292,46 +346,16 @@ class WALEntryStream implements Closeable {
return HasNext.RETRY_IMMEDIATELY; return HasNext.RETRY_IMMEDIATELY;
case EOF_AND_RESET: case EOF_AND_RESET:
case EOF_AND_RESET_COMPRESSION: case EOF_AND_RESET_COMPRESSION:
if (!beingWritten) { if (beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY;
}
Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
// should not be written
assert !p.getSecond();
if (state.eof()) {
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}
} else {
// just sleep a bit and retry to see if there are new entries coming since the file is // just sleep a bit and retry to see if there are new entries coming since the file is
// still being written // still being written
return HasNext.RETRY; return HasNext.RETRY;
} }
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
return lastAttempt();
case ERROR_AND_RESET: case ERROR_AND_RESET:
case ERROR_AND_RESET_COMPRESSION: case ERROR_AND_RESET_COMPRESSION:
// we have meet an error, just sleep a bit and retry again // we have meet an error, just sleep a bit and retry again
@ -393,10 +417,8 @@ class WALEntryStream implements Closeable {
return false; return false;
} }
} }
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of {} and length of the file is {}", currentPath, LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
stat == null ? "N/A" : stat.getLen()); stat == null ? "N/A" : stat.getLen());
}
metrics.incrCompletedWAL(); metrics.incrCompletedWAL();
return true; return true;
} }