HBASE-24492 : Remove infinite loop from ProtobufLogReader#readNext (#1831)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
f71f1cdfa0
commit
8de8c44029
|
@ -329,108 +329,108 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean readNext(Entry entry) throws IOException {
|
protected boolean readNext(Entry entry) throws IOException {
|
||||||
while (true) {
|
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
|
||||||
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
|
long originalPosition = this.inputStream.getPos();
|
||||||
long originalPosition = this.inputStream.getPos();
|
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
|
||||||
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
|
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
|
||||||
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
|
return false;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
WALKey.Builder builder = WALKey.newBuilder();
|
|
||||||
long size = 0;
|
|
||||||
boolean resetPosition = false;
|
|
||||||
try {
|
|
||||||
long available = -1;
|
|
||||||
try {
|
|
||||||
int firstByte = this.inputStream.read();
|
|
||||||
if (firstByte == -1) {
|
|
||||||
throw new EOFException();
|
|
||||||
}
|
|
||||||
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
|
|
||||||
// available may be < 0 on local fs for instance. If so, can't depend on it.
|
|
||||||
available = this.inputStream.available();
|
|
||||||
if (available > 0 && available < size) {
|
|
||||||
throw new EOFException("Available stream not enough for edit, " +
|
|
||||||
"inputStream.available()= " + this.inputStream.available() + ", " +
|
|
||||||
"entry size= " + size + " at offset = " + this.inputStream.getPos());
|
|
||||||
}
|
|
||||||
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),
|
|
||||||
(int)size);
|
|
||||||
} catch (InvalidProtocolBufferException ipbe) {
|
|
||||||
resetPosition = true;
|
|
||||||
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
|
|
||||||
originalPosition + ", currentPosition=" + this.inputStream.getPos() +
|
|
||||||
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
|
|
||||||
}
|
|
||||||
if (!builder.isInitialized()) {
|
|
||||||
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
|
|
||||||
// If we can get the KV count, we could, theoretically, try to get next record.
|
|
||||||
throw new EOFException("Partial PB while reading WAL, " +
|
|
||||||
"probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
|
|
||||||
}
|
|
||||||
WALKey walKey = builder.build();
|
|
||||||
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
|
||||||
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
|
||||||
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
|
||||||
this.inputStream.getPos());
|
|
||||||
seekOnFs(originalPosition);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
int expectedCells = walKey.getFollowingKvCount();
|
|
||||||
long posBefore = this.inputStream.getPos();
|
|
||||||
try {
|
|
||||||
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
|
|
||||||
if (expectedCells != actualCells) {
|
|
||||||
resetPosition = true;
|
|
||||||
throw new EOFException("Only read " + actualCells); // other info added in catch
|
|
||||||
}
|
|
||||||
} catch (Exception ex) {
|
|
||||||
String posAfterStr = "<unknown>";
|
|
||||||
try {
|
|
||||||
posAfterStr = this.inputStream.getPos() + "";
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOG.trace("Error getting pos for error message - ignoring", t);
|
|
||||||
}
|
|
||||||
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
|
|
||||||
+ posBefore + " and read up to " + posAfterStr;
|
|
||||||
IOException realEofEx = extractHiddenEof(ex);
|
|
||||||
throw (EOFException) new EOFException("EOF " + message).
|
|
||||||
initCause(realEofEx != null ? realEofEx : ex);
|
|
||||||
}
|
|
||||||
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
|
|
||||||
LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
|
|
||||||
+ ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
|
|
||||||
+ this.walEditsStopOffset);
|
|
||||||
throw new EOFException("Read WALTrailer while reading WALEdits");
|
|
||||||
}
|
|
||||||
} catch (EOFException eof) {
|
|
||||||
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
|
|
||||||
if (originalPosition < 0) {
|
|
||||||
LOG.warn("Encountered a malformed edit, but can't seek back to last good position "
|
|
||||||
+ "because originalPosition is negative. last offset={}",
|
|
||||||
this.inputStream.getPos(), eof);
|
|
||||||
throw eof;
|
|
||||||
}
|
|
||||||
// If stuck at the same place and we got and exception, lets go back at the beginning.
|
|
||||||
if (inputStream.getPos() == originalPosition) {
|
|
||||||
if (resetPosition) {
|
|
||||||
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since " +
|
|
||||||
"current position and original position match at {}", originalPosition);
|
|
||||||
seekOnFs(0);
|
|
||||||
} else {
|
|
||||||
LOG.debug("Reached the end of file at position {}", originalPosition);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Else restore our position to original location in hope that next time through we will
|
|
||||||
// read successfully.
|
|
||||||
LOG.warn("Encountered a malformed edit, seeking back to last good position in file, " +
|
|
||||||
"from {} to {}", inputStream.getPos(), originalPosition, eof);
|
|
||||||
seekOnFs(originalPosition);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
WALKey.Builder builder = WALKey.newBuilder();
|
||||||
|
long size = 0;
|
||||||
|
boolean resetPosition = false;
|
||||||
|
try {
|
||||||
|
long available = -1;
|
||||||
|
try {
|
||||||
|
int firstByte = this.inputStream.read();
|
||||||
|
if (firstByte == -1) {
|
||||||
|
throw new EOFException();
|
||||||
|
}
|
||||||
|
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
|
||||||
|
// available may be < 0 on local fs for instance. If so, can't depend on it.
|
||||||
|
available = this.inputStream.available();
|
||||||
|
if (available > 0 && available < size) {
|
||||||
|
throw new EOFException(
|
||||||
|
"Available stream not enough for edit, " + "inputStream.available()= "
|
||||||
|
+ this.inputStream.available() + ", " + "entry size= " + size + " at offset = "
|
||||||
|
+ this.inputStream.getPos());
|
||||||
|
}
|
||||||
|
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
|
||||||
|
} catch (InvalidProtocolBufferException ipbe) {
|
||||||
|
resetPosition = true;
|
||||||
|
throw (EOFException) new EOFException(
|
||||||
|
"Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition="
|
||||||
|
+ this.inputStream.getPos() + ", messageSize=" + size + ", currentAvailable="
|
||||||
|
+ available).initCause(ipbe);
|
||||||
|
}
|
||||||
|
if (!builder.isInitialized()) {
|
||||||
|
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
|
||||||
|
// If we can get the KV count, we could, theoretically, try to get next record.
|
||||||
|
throw new EOFException(
|
||||||
|
"Partial PB while reading WAL, " + "probably an unexpected EOF, ignoring. current offset="
|
||||||
|
+ this.inputStream.getPos());
|
||||||
|
}
|
||||||
|
WALKey walKey = builder.build();
|
||||||
|
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
||||||
|
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
||||||
|
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
||||||
|
this.inputStream.getPos());
|
||||||
|
seekOnFs(originalPosition);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
int expectedCells = walKey.getFollowingKvCount();
|
||||||
|
long posBefore = this.inputStream.getPos();
|
||||||
|
try {
|
||||||
|
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
|
||||||
|
if (expectedCells != actualCells) {
|
||||||
|
resetPosition = true;
|
||||||
|
throw new EOFException("Only read " + actualCells); // other info added in catch
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
String posAfterStr = "<unknown>";
|
||||||
|
try {
|
||||||
|
posAfterStr = this.inputStream.getPos() + "";
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.trace("Error getting pos for error message - ignoring", t);
|
||||||
|
}
|
||||||
|
String message =
|
||||||
|
" while reading " + expectedCells + " WAL KVs; started reading at " + posBefore
|
||||||
|
+ " and read up to " + posAfterStr;
|
||||||
|
IOException realEofEx = extractHiddenEof(ex);
|
||||||
|
throw (EOFException) new EOFException("EOF " + message).
|
||||||
|
initCause(realEofEx != null ? realEofEx : ex);
|
||||||
|
}
|
||||||
|
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
|
||||||
|
LOG.error(
|
||||||
|
"Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): "
|
||||||
|
+ this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset);
|
||||||
|
throw new EOFException("Read WALTrailer while reading WALEdits");
|
||||||
|
}
|
||||||
|
} catch (EOFException eof) {
|
||||||
|
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
|
||||||
|
if (originalPosition < 0) {
|
||||||
|
LOG.warn("Encountered a malformed edit, but can't seek back to last good position "
|
||||||
|
+ "because originalPosition is negative. last offset={}", this.inputStream.getPos(), eof);
|
||||||
|
throw eof;
|
||||||
|
}
|
||||||
|
// If stuck at the same place and we got and exception, lets go back at the beginning.
|
||||||
|
if (inputStream.getPos() == originalPosition) {
|
||||||
|
if (resetPosition) {
|
||||||
|
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
|
||||||
|
+ "current position and original position match at {}", originalPosition);
|
||||||
|
seekOnFs(0);
|
||||||
|
} else {
|
||||||
|
LOG.debug("Reached the end of file at position {}", originalPosition);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Else restore our position to original location in hope that next time through we will
|
||||||
|
// read successfully.
|
||||||
|
LOG.warn("Encountered a malformed edit, seeking back to last good position in file, "
|
||||||
|
+ "from {} to {}", inputStream.getPos(), originalPosition, eof);
|
||||||
|
seekOnFs(originalPosition);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private IOException extractHiddenEof(Exception ex) {
|
private IOException extractHiddenEof(Exception ex) {
|
||||||
|
|
Loading…
Reference in New Issue