diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index d95d42f2f30..c6268674c5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -206,73 +206,127 @@ class WALEntryStream implements Closeable { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", justification = "HDFS-4380") - private HasNext tryAdvanceEntry() { - if (reader == null) { - // try open next WAL file - PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); - Path nextPath = queue.peek(); - if (nextPath != null) { - setCurrentPath(nextPath); - // we need to test this prior to create the reader. If not, it is possible that, while - // opening the file, the file is still being written so its header is incomplete and we get - // a header EOF, but then while we test whether it is still being written, we have already - // flushed the data out and we consider it is not being written, and then we just skip over - // file, then we will lose the data written after opening... - boolean beingWritten = - walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent(); + private HasNext prepareReader() { + if (reader != null) { + if (state != null && state != WALTailingReader.State.NORMAL) { + // reset before reading + LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath, + currentPositionOfEntry, state.resetCompression()); try { - reader = WALFactory.createTailingReader(fs, nextPath, conf, - currentPositionOfEntry > 0 ? currentPositionOfEntry : -1); - } catch (WALHeaderEOFException e) { - if (!eofAutoRecovery) { - // if we do not enable EOF auto recovery, just let the upper layer retry - // the replication will be stuck usually, and need to be fixed manually - return HasNext.RETRY; - } - // we hit EOF while reading the WAL header, usually this means we can just skip over this - // file, but we need to be careful that whether this file is still being written, if so we - // should retry instead of skipping. - LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e); - if (beingWritten) { - // just retry as the file is still being written, maybe next time we could read - // something - return HasNext.RETRY; + if (currentPositionOfEntry > 0) { + reader.resetTo(currentPositionOfEntry, state.resetCompression()); } else { - // the file is not being written so we are safe to just skip over it - dequeueCurrentLog(); - return HasNext.RETRY_IMMEDIATELY; + // we will read from the beginning so we should always clear the compression context + reader.resetTo(-1, true); } - } catch (LeaseNotRecoveredException e) { - // HBASE-15019 the WAL was not closed due to some hiccup. - LOG.warn("Try to recover the WAL lease " + nextPath, e); - AbstractFSWALProvider.recoverLease(conf, nextPath); - return HasNext.RETRY; - } catch (IOException | NullPointerException e) { - // For why we need to catch NPE here, see HDFS-4380 for more details - LOG.warn("Failed to open WAL reader for path: {}", nextPath, e); + } catch (IOException e) { + LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, + currentPositionOfEntry, state.resetCompression(), e); + // just leave the state as is, and try resetting next time return HasNext.RETRY; } } else { - // no more files in queue, this could happen for recovered queue, or for a wal group of a - // sync replication peer which has already been transited to DA or S. - setCurrentPath(null); - return HasNext.NO; + return HasNext.YES; } - } else if (state != null && state != WALTailingReader.State.NORMAL) { - // reset before reading - try { - if (currentPositionOfEntry > 0) { - reader.resetTo(currentPositionOfEntry, state.resetCompression()); - } else { - // we will read from the beginning so we should always clear the compression context - reader.resetTo(-1, true); - } - } catch (IOException e) { - LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, - currentPositionOfEntry, state.resetCompression(), e); - // just leave the state as is, and try resetting next time + } + // try open next WAL file + PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); + Path nextPath = queue.peek(); + if (nextPath == null) { + LOG.debug("No more WAL files in queue"); + // no more files in queue, this could happen for recovered queue, or for a wal group of a + // sync replication peer which has already been transited to DA or S. + setCurrentPath(null); + return HasNext.NO; + } + setCurrentPath(nextPath); + // we need to test this prior to create the reader. If not, it is possible that, while + // opening the file, the file is still being written so its header is incomplete and we get + // a header EOF, but then while we test whether it is still being written, we have already + // flushed the data out and we consider it is not being written, and then we just skip over + // file, then we will lose the data written after opening... + boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent(); + LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath, + currentPositionOfEntry, beingWritten); + try { + reader = WALFactory.createTailingReader(fs, nextPath, conf, + currentPositionOfEntry > 0 ? currentPositionOfEntry : -1); + return HasNext.YES; + } catch (WALHeaderEOFException e) { + if (!eofAutoRecovery) { + // if we do not enable EOF auto recovery, just let the upper layer retry + // the replication will be stuck usually, and need to be fixed manually return HasNext.RETRY; } + // we hit EOF while reading the WAL header, usually this means we can just skip over this + // file, but we need to be careful that whether this file is still being written, if so we + // should retry instead of skipping. + LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath, + currentPositionOfEntry, e); + if (beingWritten) { + // just retry as the file is still being written, maybe next time we could read + // something + return HasNext.RETRY; + } else { + // the file is not being written so we are safe to just skip over it + dequeueCurrentLog(); + return HasNext.RETRY_IMMEDIATELY; + } + } catch (LeaseNotRecoveredException e) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + nextPath, e); + AbstractFSWALProvider.recoverLease(conf, nextPath); + return HasNext.RETRY; + } catch (IOException | NullPointerException e) { + // For why we need to catch NPE here, see HDFS-4380 for more details + LOG.warn("Failed to open WAL reader for path: {}", nextPath, e); + return HasNext.RETRY; + } + } + + private HasNext lastAttempt() { + LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath, + currentPositionOfEntry, state.resetCompression()); + try { + reader.resetTo(currentPositionOfEntry, state.resetCompression()); + } catch (IOException e) { + LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, + currentPositionOfEntry, state.resetCompression(), e); + // just leave the state as is, next time we will try to reset it again, but there is a + // nasty problem is that, we will still reach here finally and try reset again to see if + // the log has been fully replicated, which is redundant, can be optimized later + return HasNext.RETRY; + } + Pair pair = readNextEntryAndRecordReaderPosition(); + state = pair.getFirst(); + // should not be written + assert !pair.getSecond(); + if (!state.eof()) { + // we still have something to read after reopen, so return YES. Or there are something wrong + // and we need to retry + return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY; + } + // No data available after reopen + if (checkAllBytesParsed()) { + // move to the next wal file and read + dequeueCurrentLog(); + return HasNext.RETRY_IMMEDIATELY; + } else { + // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from + // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION + // so when calling tryAdvanceENtry next time we will reset the reader to the beginning + // and read. + currentPositionOfEntry = 0; + currentPositionOfReader = 0; + state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION; + return HasNext.RETRY; + } + } + + private HasNext tryAdvanceEntry() { + HasNext prepared = prepareReader(); + if (prepared != HasNext.YES) { + return prepared; } Pair pair = readNextEntryAndRecordReaderPosition(); @@ -292,46 +346,16 @@ class WALEntryStream implements Closeable { return HasNext.RETRY_IMMEDIATELY; case EOF_AND_RESET: case EOF_AND_RESET_COMPRESSION: - if (!beingWritten) { - // no more entries in this log file, and the file is already closed, i.e, rolled - // Before dequeuing, we should always get one more attempt at reading. - // This is in case more entries came in after we opened the reader, and the log is rolled - // while we were reading. See HBASE-6758 - try { - reader.resetTo(currentPositionOfEntry, state.resetCompression()); - } catch (IOException e) { - LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, - currentPositionOfEntry, state.resetCompression(), e); - // just leave the state as is, next time we will try to reset it again, but there is a - // nasty problem is that, we will still reach here finally and try reset again to see if - // the log has been fully replicated, which is redundant, can be optimized later - return HasNext.RETRY; - } - Pair p = readNextEntryAndRecordReaderPosition(); - state = pair.getFirst(); - // should not be written - assert !p.getSecond(); - if (state.eof()) { - if (checkAllBytesParsed()) { - // move to the next wal file and read - dequeueCurrentLog(); - return HasNext.RETRY_IMMEDIATELY; - } else { - // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from - // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION - // so when calling tryAdvanceENtry next time we will reset the reader to the beginning - // and read. - currentPositionOfEntry = 0; - currentPositionOfReader = 0; - state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION; - return HasNext.RETRY; - } - } - } else { + if (beingWritten) { // just sleep a bit and retry to see if there are new entries coming since the file is // still being written return HasNext.RETRY; } + // no more entries in this log file, and the file is already closed, i.e, rolled + // Before dequeuing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, and the log is rolled + // while we were reading. See HBASE-6758 + return lastAttempt(); case ERROR_AND_RESET: case ERROR_AND_RESET_COMPRESSION: // we have meet an error, just sleep a bit and retry again @@ -393,10 +417,8 @@ class WALEntryStream implements Closeable { return false; } } - if (LOG.isDebugEnabled()) { - LOG.debug("Reached the end of {} and length of the file is {}", currentPath, - stat == null ? "N/A" : stat.getLen()); - } + LOG.debug("Reached the end of {} and length of the file is {}", currentPath, + stat == null ? "N/A" : stat.getLen()); metrics.incrCompletedWAL(); return true; }