HBASE-24492 : Remove infinite loop from ProtobufLogReader#readNext (#1831)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Viraj Jasani 2020-06-03 22:21:22 +05:30
parent d889c7b442
commit bbfb4d432f
No known key found for this signature in database
GPG Key ID: 3AE697641452FC5D
1 changed files with 110 additions and 107 deletions

View File

@ -325,7 +325,6 @@ public class ProtobufLogReader extends ReaderBase {
@Override
protected boolean readNext(Entry entry) throws IOException {
while (true) {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
@ -348,29 +347,32 @@ public class ProtobufLogReader extends ReaderBase {
// available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
throw new EOFException("Available stream not enough for edit, " +
"inputStream.available()= " + this.inputStream.available() + ", " +
"entry size= " + size + " at offset = " + this.inputStream.getPos());
throw new EOFException(
"Available stream not enough for edit, " + "inputStream.available()= "
+ this.inputStream.available() + ", " + "entry size= " + size + " at offset = "
+ this.inputStream.getPos());
}
ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
(int)size);
ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), (int) size);
} catch (InvalidProtocolBufferException ipbe) {
resetPosition = true;
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
originalPosition + ", currentPosition=" + this.inputStream.getPos() +
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
throw (EOFException) new EOFException(
"Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition="
+ this.inputStream.getPos() + ", messageSize=" + size + ", currentAvailable="
+ available).initCause(ipbe);
}
if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
// If we can get the KV count, we could, theoretically, try to get next record.
throw new EOFException("Partial PB while reading WAL, " +
"probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
throw new EOFException(
"Partial PB while reading WAL, " + "probably an unexpected EOF, ignoring. current offset="
+ this.inputStream.getPos());
}
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
if (LOG.isTraceEnabled()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos());
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset="
+ this.inputStream.getPos());
}
seekOnFs(originalPosition);
return false;
@ -392,16 +394,17 @@ public class ProtobufLogReader extends ReaderBase {
LOG.trace("Error getting pos for error message - ignoring", t);
}
}
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ posBefore + " and read up to " + posAfterStr;
String message =
" while reading " + expectedCells + " WAL KVs; started reading at " + posBefore
+ " and read up to " + posAfterStr;
IOException realEofEx = extractHiddenEof(ex);
throw (EOFException) new EOFException("EOF " + message).
initCause(realEofEx != null ? realEofEx : ex);
}
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
+ ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ this.walEditsStopOffset);
LOG.error(
"Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): "
+ this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset);
throw new EOFException("Read WALTrailer while reading WALEdits");
}
} catch (EOFException eof) {
@ -409,8 +412,8 @@ public class ProtobufLogReader extends ReaderBase {
if (originalPosition < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, but can't seek back to last good position "
+ "because originalPosition is negative. last offset="
+ this.inputStream.getPos(), eof);
+ "because originalPosition is negative. last offset=" + this.inputStream.getPos(),
eof);
}
throw eof;
}
@ -425,8 +428,9 @@ public class ProtobufLogReader extends ReaderBase {
// Else restore our position to original location in hope that next time through we will
// read successfully.
if (LOG.isTraceEnabled()) {
LOG.trace("Encountered a malformed edit, seeking back to last good position in file, "
+ "from " + inputStream.getPos()+" to " + originalPosition, eof);
LOG.trace(
"Encountered a malformed edit, seeking back to last good position in file, " + "from "
+ inputStream.getPos() + " to " + originalPosition, eof);
}
seekOnFs(originalPosition);
}
@ -434,7 +438,6 @@ public class ProtobufLogReader extends ReaderBase {
}
return true;
}
}
private IOException extractHiddenEof(Exception ex) {
// There are two problems we are dealing with here. Hadoop stream throws generic exception