HBASE-22502 Purge the logs when we reach the EOF for the last wal file when replication

This commit is contained in:
Duo Zhang 2019-05-30 15:20:30 +08:00 committed by Apache9
parent 12584ebf46
commit 6735cc13c1
1 changed files with 11 additions and 7 deletions

View File

@ -344,7 +344,7 @@ public class ProtobufLogReader extends ReaderBase {
try { try {
int firstByte = this.inputStream.read(); int firstByte = this.inputStream.read();
if (firstByte == -1) { if (firstByte == -1) {
throw new EOFException("First byte is negative at offset " + originalPosition); throw new EOFException();
} }
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
// available may be < 0 on local fs for instance. If so, can't depend on it. // available may be < 0 on local fs for instance. If so, can't depend on it.
@ -412,15 +412,19 @@ public class ProtobufLogReader extends ReaderBase {
throw eof; throw eof;
} }
// If stuck at the same place and we got and exception, lets go back at the beginning. // If stuck at the same place and we got and exception, lets go back at the beginning.
if (inputStream.getPos() == originalPosition && resetPosition) { if (inputStream.getPos() == originalPosition) {
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since " if (resetPosition) {
+ "current position and original position match at {}", originalPosition); LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since " +
seekOnFs(0); "current position and original position match at {}", originalPosition);
seekOnFs(0);
} else {
LOG.info("Reached the end of file at position {}", originalPosition);
}
} else { } else {
// Else restore our position to original location in hope that next time through we will // Else restore our position to original location in hope that next time through we will
// read successfully. // read successfully.
LOG.warn("Encountered a malformed edit, seeking back to last good position in file, " LOG.warn("Encountered a malformed edit, seeking back to last good position in file, " +
+ "from {} to {}", inputStream.getPos(), originalPosition, eof); "from {} to {}", inputStream.getPos(), originalPosition, eof);
seekOnFs(originalPosition); seekOnFs(originalPosition);
} }
return false; return false;