HBASE-9373 [replication] data loss because replication doesn't expect partial reads

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1519037 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2013-08-30 18:07:35 +00:00
parent ce96c187ef
commit 031b3d99a9
1 changed files with 66 additions and 47 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.io.LimitInputStream;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException;
/**
@ -147,7 +150,7 @@ public class ProtobufLogReader extends ReaderBase {
ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
LOG.warn("No trailer found.");
LOG.trace("No trailer found.");
return false;
}
if (trailerSize < 0) {
@ -190,57 +193,73 @@ public class ProtobufLogReader extends ReaderBase {
@Override
protected boolean readNext(HLog.Entry entry) throws IOException {
while (true) {
if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false;
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition == this.walEditsStopOffset) return false;
WALKey.Builder builder = WALKey.newBuilder();
boolean hasNext = false;
int size = 0;
try {
hasNext = builder.mergeDelimitedFrom(inputStream);
} catch (InvalidProtocolBufferException ipbe) {
LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
}
if (!hasNext) return false;
if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
// If we can get the KV count, we could, theoretically, try to get next record.
LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
return false;
}
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.warn("WALKey has no KVs that follow it; trying the next one");
continue;
}
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
try {
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
if (expectedCells != actualCells) {
throw new EOFException("Only read " + actualCells); // other info added in catch
}
} catch (Exception ex) {
String posAfterStr = "<unknown>";
int originalAvailable = this.inputStream.available();
try {
posAfterStr = this.inputStream.getPos() + "";
} catch (Throwable t) {
LOG.trace("Error getting pos for error message - ignoring", t);
int firstByte = this.inputStream.read();
if (firstByte == -1) {
throw new EOFException("First byte is negative");
}
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
if (this.inputStream.available() < size) {
throw new EOFException("Available stream not enough for edit, " +
"inputStream.available()= " + this.inputStream.available() + ", " +
"entry size= " + size);
}
final InputStream limitedInput = new LimitInputStream(this.inputStream, size);
builder.mergeFrom(limitedInput);
} catch (InvalidProtocolBufferException ipbe) {
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
originalPosition + ", currentPosition=" + this.inputStream.getPos() +
", messageSize=" + size + ", originalAvailable=" + originalAvailable +
", currentAvailable=" + this.inputStream.available()).initCause(ipbe);
}
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ posBefore + " and read up to " + posAfterStr;
IOException realEofEx = extractHiddenEof(ex);
if (realEofEx != null) {
LOG.error("EOF " + message, realEofEx);
return false;
if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
// If we can get the KV count, we could, theoretically, try to get next record.
throw new EOFException("Partial PB while reading WAL, " +
"probably an unexpected EOF, ignoring");
}
message = "Error " + message;
LOG.error(message);
throw new IOException(message, ex);
}
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+ ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ this.walEditsStopOffset);
throw new IOException("Read WALTrailer while reading WALEdits");
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one");
continue;
}
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
try {
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
if (expectedCells != actualCells) {
throw new EOFException("Only read " + actualCells); // other info added in catch
}
} catch (Exception ex) {
String posAfterStr = "<unknown>";
try {
posAfterStr = this.inputStream.getPos() + "";
} catch (Throwable t) {
LOG.trace("Error getting pos for error message - ignoring", t);
}
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ posBefore + " and read up to " + posAfterStr;
IOException realEofEx = extractHiddenEof(ex);
throw (EOFException) new EOFException("EOF " + message).
initCause(realEofEx != null ? ex : realEofEx);
}
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+ ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ this.walEditsStopOffset);
throw new EOFException("Read WALTrailer while reading WALEdits");
}
} catch (EOFException eof) {
LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
seekOnFs(originalPosition);
return false;
}
return true;
}