HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)

Signed-off-by: Liangjun He <heliangjun@apache.org>
(cherry picked from commit 1f2e1f5b3a)
This commit is contained in:
Duo Zhang 2023-03-15 21:47:25 +08:00
parent b5dcd48c64
commit ef8a981f22
1 changed files with 120 additions and 98 deletions

View File

@ -206,73 +206,127 @@ class WALEntryStream implements Closeable {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
private HasNext tryAdvanceEntry() {
if (reader == null) {
// try open next WAL file
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath != null) {
setCurrentPath(nextPath);
// we need to test this prior to create the reader. If not, it is possible that, while
// opening the file, the file is still being written so its header is incomplete and we get
// a header EOF, but then while we test whether it is still being written, we have already
// flushed the data out and we consider it is not being written, and then we just skip over
// file, then we will lose the data written after opening...
boolean beingWritten =
walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
private HasNext prepareReader() {
if (reader != null) {
if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading
LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader = WALFactory.createTailingReader(fs, nextPath, conf,
currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
} catch (WALHeaderEOFException e) {
if (!eofAutoRecovery) {
// if we do not enable EOF auto recovery, just let the upper layer retry
// the replication will be stuck usually, and need to be fixed manually
return HasNext.RETRY;
}
// we hit EOF while reading the WAL header, usually this means we can just skip over this
// file, but we need to be careful that whether this file is still being written, if so we
// should retry instead of skipping.
LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e);
if (beingWritten) {
// just retry as the file is still being written, maybe next time we could read
// something
return HasNext.RETRY;
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// the file is not being written so we are safe to just skip over it
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (LeaseNotRecoveredException e) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + nextPath, e);
AbstractFSWALProvider.recoverLease(conf, nextPath);
return HasNext.RETRY;
} catch (IOException | NullPointerException e) {
// For why we need to catch NPE here, see HDFS-4380 for more details
LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time
return HasNext.RETRY;
}
} else {
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
return HasNext.YES;
}
} else if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading
try {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time
}
// try open next WAL file
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath == null) {
LOG.debug("No more WAL files in queue");
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
}
setCurrentPath(nextPath);
// we need to test this prior to create the reader. If not, it is possible that, while
// opening the file, the file is still being written so its header is incomplete and we get
// a header EOF, but then while we test whether it is still being written, we have already
// flushed the data out and we consider it is not being written, and then we just skip over
// file, then we will lose the data written after opening...
boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
currentPositionOfEntry, beingWritten);
try {
reader = WALFactory.createTailingReader(fs, nextPath, conf,
currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
return HasNext.YES;
} catch (WALHeaderEOFException e) {
if (!eofAutoRecovery) {
// if we do not enable EOF auto recovery, just let the upper layer retry
// the replication will be stuck usually, and need to be fixed manually
return HasNext.RETRY;
}
// we hit EOF while reading the WAL header, usually this means we can just skip over this
// file, but we need to be careful that whether this file is still being written, if so we
// should retry instead of skipping.
LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
currentPositionOfEntry, e);
if (beingWritten) {
// just retry as the file is still being written, maybe next time we could read
// something
return HasNext.RETRY;
} else {
// the file is not being written so we are safe to just skip over it
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
}
} catch (LeaseNotRecoveredException e) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + nextPath, e);
AbstractFSWALProvider.recoverLease(conf, nextPath);
return HasNext.RETRY;
} catch (IOException | NullPointerException e) {
// For why we need to catch NPE here, see HDFS-4380 for more details
LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
return HasNext.RETRY;
}
}
private HasNext lastAttempt() {
LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY;
}
Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
// should not be written
assert !pair.getSecond();
if (!state.eof()) {
// we still have something to read after reopen, so return YES. Or there are something wrong
// and we need to retry
return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
}
// No data available after reopen
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}
private HasNext tryAdvanceEntry() {
HasNext prepared = prepareReader();
if (prepared != HasNext.YES) {
return prepared;
}
Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
@ -292,46 +346,16 @@ class WALEntryStream implements Closeable {
return HasNext.RETRY_IMMEDIATELY;
case EOF_AND_RESET:
case EOF_AND_RESET_COMPRESSION:
if (!beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY;
}
Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
// should not be written
assert !p.getSecond();
if (state.eof()) {
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}
} else {
if (beingWritten) {
// just sleep a bit and retry to see if there are new entries coming since the file is
// still being written
return HasNext.RETRY;
}
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
return lastAttempt();
case ERROR_AND_RESET:
case ERROR_AND_RESET_COMPRESSION:
// we have meet an error, just sleep a bit and retry again
@ -393,10 +417,8 @@ class WALEntryStream implements Closeable {
return false;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
stat == null ? "N/A" : stat.getLen());
}
LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
stat == null ? "N/A" : stat.getLen());
metrics.incrCompletedWAL();
return true;
}