HBASE-20604 ProtobufLogReader#readNext can incorrectly loop to the same position in the stream until the the WAL is rolled

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Esteban Gutierrez 2018-05-18 15:11:13 -05:00 committed by Andrew Purtell
parent e5fb2f968a
commit dcdebbffdc
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
1 changed files with 23 additions and 7 deletions

View File

@ -340,6 +340,7 @@ public class ProtobufLogReader extends ReaderBase {
} }
WALKey.Builder builder = WALKey.newBuilder(); WALKey.Builder builder = WALKey.newBuilder();
long size = 0; long size = 0;
boolean resetPosition = false;
try { try {
long available = -1; long available = -1;
try { try {
@ -358,6 +359,7 @@ public class ProtobufLogReader extends ReaderBase {
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),
(int)size); (int)size);
} catch (InvalidProtocolBufferException ipbe) { } catch (InvalidProtocolBufferException ipbe) {
resetPosition = true;
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
originalPosition + ", currentPosition=" + this.inputStream.getPos() + originalPosition + ", currentPosition=" + this.inputStream.getPos() +
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
@ -375,13 +377,15 @@ public class ProtobufLogReader extends ReaderBase {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +
this.inputStream.getPos()); this.inputStream.getPos());
} }
continue; seekOnFs(originalPosition);
return false;
} }
int expectedCells = walKey.getFollowingKvCount(); int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos(); long posBefore = this.inputStream.getPos();
try { try {
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
if (expectedCells != actualCells) { if (expectedCells != actualCells) {
resetPosition = true;
throw new EOFException("Only read " + actualCells); // other info added in catch throw new EOFException("Only read " + actualCells); // other info added in catch
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -409,16 +413,28 @@ public class ProtobufLogReader extends ReaderBase {
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
if (originalPosition < 0) { if (originalPosition < 0) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof); LOG.trace("Encountered a malformed edit, but can't seek back to last good position "
+ "because originalPosition is negative. last offset="
+ this.inputStream.getPos(), eof);
} }
throw eof; throw eof;
} }
// Else restore our position to original location in hope that next time through we will // If stuck at the same place and we got and exception, lets go back at the beginning.
// read successfully. if (inputStream.getPos() == originalPosition && resetPosition) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof); LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since "
+ "current position and original position match at " + originalPosition);
}
seekOnFs(0);
} else {
// Else restore our position to original location in hope that next time through we will
// read successfully.
if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, seeking back to last good position in file, "
+ "from " + inputStream.getPos()+" to " + originalPosition, eof);
}
seekOnFs(originalPosition);
} }
seekOnFs(originalPosition);
return false; return false;
} }
return true; return true;