HBASE-22340 Corrupt KeyValue is silently ignored (#207)
This commit is contained in:
parent
b06ad82b9e
commit
32462d5690
@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Function;
|
import org.apache.hbase.thirdparty.com.google.common.base.Function;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
@ -46,6 +48,8 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils
|
|||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class KeyValueUtil {
|
public class KeyValueUtil {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(KeyValueUtil.class);
|
||||||
|
|
||||||
/**************** length *********************/
|
/**************** length *********************/
|
||||||
|
|
||||||
public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
|
public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
|
||||||
@ -510,97 +514,124 @@ public class KeyValueUtil {
|
|||||||
|
|
||||||
static String bytesToHex(byte[] buf, int offset, int length) {
|
static String bytesToHex(byte[] buf, int offset, int length) {
|
||||||
String bufferContents = buf != null ? Bytes.toStringBinary(buf, offset, length) : "<null>";
|
String bufferContents = buf != null ? Bytes.toStringBinary(buf, offset, length) : "<null>";
|
||||||
return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset
|
return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset + ", length=" + length;
|
||||||
+ ", length=" + length;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
|
static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
|
||||||
if (buf == null) {
|
if (buf == null) {
|
||||||
throw new IllegalArgumentException("Invalid to have null " +
|
String msg = "Invalid to have null byte array in KeyValue.";
|
||||||
"byte array in KeyValue.");
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int pos = offset, endOffset = offset + length;
|
int pos = offset, endOffset = offset + length;
|
||||||
// check the key
|
// check the key
|
||||||
if (pos + Bytes.SIZEOF_INT > endOffset) {
|
if (pos + Bytes.SIZEOF_INT > endOffset) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length));
|
"Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
|
int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
|
||||||
pos += Bytes.SIZEOF_INT;
|
pos += Bytes.SIZEOF_INT;
|
||||||
if (keyLen <= 0 || pos + keyLen > endOffset) {
|
if (keyLen <= 0 || pos + keyLen > endOffset) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length));
|
"Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
// check the value
|
// check the value
|
||||||
if (pos + Bytes.SIZEOF_INT > endOffset) {
|
if (pos + Bytes.SIZEOF_INT > endOffset) {
|
||||||
throw new IllegalArgumentException("Overflow when reading value length at position=" + pos
|
String msg =
|
||||||
+ bytesToHex(buf, offset, length));
|
"Overflow when reading value length at position=" + pos + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
|
int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
|
||||||
pos += Bytes.SIZEOF_INT;
|
pos += Bytes.SIZEOF_INT;
|
||||||
if (valLen < 0 || pos + valLen > endOffset) {
|
if (valLen < 0 || pos + valLen > endOffset) {
|
||||||
throw new IllegalArgumentException("Invalid value length in KeyValue, valueLength=" + valLen
|
String msg = "Invalid value length in KeyValue, valueLength=" + valLen +
|
||||||
+ bytesToHex(buf, offset, length));
|
bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
// check the row
|
// check the row
|
||||||
if (pos + Bytes.SIZEOF_SHORT > endOffset) {
|
if (pos + Bytes.SIZEOF_SHORT > endOffset) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length));
|
"Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
|
short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
|
||||||
pos += Bytes.SIZEOF_SHORT;
|
pos += Bytes.SIZEOF_SHORT;
|
||||||
if (rowLen < 0 || pos + rowLen > endOffset) {
|
if (rowLen < 0 || pos + rowLen > endOffset) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length));
|
"Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
pos += rowLen;
|
pos += rowLen;
|
||||||
// check the family
|
// check the family
|
||||||
if (pos + Bytes.SIZEOF_BYTE > endOffset) {
|
if (pos + Bytes.SIZEOF_BYTE > endOffset) {
|
||||||
throw new IllegalArgumentException("Overflow when reading family length at position=" + pos
|
String msg = "Overflow when reading family length at position=" + pos +
|
||||||
+ bytesToHex(buf, offset, length));
|
bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
int familyLen = buf[pos];
|
int familyLen = buf[pos];
|
||||||
pos += Bytes.SIZEOF_BYTE;
|
pos += Bytes.SIZEOF_BYTE;
|
||||||
if (familyLen < 0 || pos + familyLen > endOffset) {
|
if (familyLen < 0 || pos + familyLen > endOffset) {
|
||||||
throw new IllegalArgumentException("Invalid family length in KeyValue, familyLength="
|
String msg = "Invalid family length in KeyValue, familyLength=" + familyLen +
|
||||||
+ familyLen + bytesToHex(buf, offset, length));
|
bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
pos += familyLen;
|
pos += familyLen;
|
||||||
// check the qualifier
|
// check the qualifier
|
||||||
int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
|
int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
|
||||||
- Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
|
- Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
|
||||||
if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
|
if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
|
||||||
throw new IllegalArgumentException("Invalid qualifier length in KeyValue, qualifierLen="
|
String msg = "Invalid qualifier length in KeyValue, qualifierLen=" + qualifierLen +
|
||||||
+ qualifierLen + bytesToHex(buf, offset, length));
|
bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
pos += qualifierLen;
|
pos += qualifierLen;
|
||||||
// check the timestamp
|
// check the timestamp
|
||||||
if (pos + Bytes.SIZEOF_LONG > endOffset) {
|
if (pos + Bytes.SIZEOF_LONG > endOffset) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length));
|
"Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
|
long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
|
||||||
if (timestamp < 0) {
|
if (timestamp < 0) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length));
|
"Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
pos += Bytes.SIZEOF_LONG;
|
pos += Bytes.SIZEOF_LONG;
|
||||||
// check the type
|
// check the type
|
||||||
if (pos + Bytes.SIZEOF_BYTE > endOffset) {
|
if (pos + Bytes.SIZEOF_BYTE > endOffset) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length));
|
"Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
byte type = buf[pos];
|
byte type = buf[pos];
|
||||||
if (!Type.isValidType(type)) {
|
if (!Type.isValidType(type)) {
|
||||||
throw new IllegalArgumentException(
|
String msg = "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length);
|
||||||
"Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length));
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
pos += Bytes.SIZEOF_BYTE;
|
pos += Bytes.SIZEOF_BYTE;
|
||||||
// check the value
|
// check the value
|
||||||
if (pos + valLen > endOffset) {
|
if (pos + valLen > endOffset) {
|
||||||
throw new IllegalArgumentException(
|
String msg =
|
||||||
"Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length));
|
"Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
pos += valLen;
|
pos += valLen;
|
||||||
// check the tags
|
// check the tags
|
||||||
@ -609,39 +640,55 @@ public class KeyValueUtil {
|
|||||||
// withTags is true but no tag in the cell.
|
// withTags is true but no tag in the cell.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (pos + Bytes.SIZEOF_SHORT > endOffset) {
|
pos = checkKeyValueTagBytes(buf, offset, length, pos, endOffset);
|
||||||
throw new IllegalArgumentException("Overflow when reading tags length at position=" + pos
|
|
||||||
+ bytesToHex(buf, offset, length));
|
|
||||||
}
|
|
||||||
short tagsLen = Bytes.toShort(buf, pos);
|
|
||||||
pos += Bytes.SIZEOF_SHORT;
|
|
||||||
if (tagsLen < 0 || pos + tagsLen > endOffset) {
|
|
||||||
throw new IllegalArgumentException("Invalid tags length in KeyValue at position="
|
|
||||||
+ (pos - Bytes.SIZEOF_SHORT) + bytesToHex(buf, offset, length));
|
|
||||||
}
|
|
||||||
int tagsEndOffset = pos + tagsLen;
|
|
||||||
for (; pos < tagsEndOffset;) {
|
|
||||||
if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
|
|
||||||
throw new IllegalArgumentException("Overflow when reading tag length at position=" + pos
|
|
||||||
+ bytesToHex(buf, offset, length));
|
|
||||||
}
|
|
||||||
short tagLen = Bytes.toShort(buf, pos);
|
|
||||||
pos += Tag.TAG_LENGTH_SIZE;
|
|
||||||
// tagLen contains one byte tag type, so must be not less than 1.
|
|
||||||
if (tagLen < 1 || pos + tagLen > endOffset) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
|
|
||||||
+ tagLen + bytesToHex(buf, offset, length));
|
|
||||||
}
|
|
||||||
pos += tagLen;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (pos != endOffset) {
|
if (pos != endOffset) {
|
||||||
throw new IllegalArgumentException("Some redundant bytes in KeyValue's buffer, startOffset="
|
String msg = "Some redundant bytes in KeyValue's buffer, startOffset=" + pos + ", endOffset="
|
||||||
+ pos + ", endOffset=" + endOffset + bytesToHex(buf, offset, length));
|
+ endOffset + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int checkKeyValueTagBytes(byte[] buf, int offset, int length, int pos,
|
||||||
|
int endOffset) {
|
||||||
|
if (pos + Bytes.SIZEOF_SHORT > endOffset) {
|
||||||
|
String msg = "Overflow when reading tags length at position=" + pos +
|
||||||
|
bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
short tagsLen = Bytes.toShort(buf, pos);
|
||||||
|
pos += Bytes.SIZEOF_SHORT;
|
||||||
|
if (tagsLen < 0 || pos + tagsLen > endOffset) {
|
||||||
|
String msg = "Invalid tags length in KeyValue at position=" + (pos - Bytes.SIZEOF_SHORT)
|
||||||
|
+ bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
int tagsEndOffset = pos + tagsLen;
|
||||||
|
for (; pos < tagsEndOffset;) {
|
||||||
|
if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
|
||||||
|
String msg = "Overflow when reading tag length at position=" + pos +
|
||||||
|
bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
short tagLen = Bytes.toShort(buf, pos);
|
||||||
|
pos += Tag.TAG_LENGTH_SIZE;
|
||||||
|
// tagLen contains one byte tag type, so must be not less than 1.
|
||||||
|
if (tagLen < 1 || pos + tagLen > endOffset) {
|
||||||
|
String msg =
|
||||||
|
"Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
|
||||||
|
+ tagLen + bytesToHex(buf, offset, length);
|
||||||
|
LOG.warn(msg);
|
||||||
|
throw new IllegalArgumentException(msg);
|
||||||
|
}
|
||||||
|
pos += tagLen;
|
||||||
|
}
|
||||||
|
return pos;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a KeyValue reading from the raw InputStream. Named
|
* Create a KeyValue reading from the raw InputStream. Named
|
||||||
* <code>createKeyValueFromInputStream</code> so doesn't clash with {@link #create(DataInput)}
|
* <code>createKeyValueFromInputStream</code> so doesn't clash with {@link #create(DataInput)}
|
||||||
|
@ -333,9 +333,7 @@ public class ProtobufLogReader extends ReaderBase {
|
|||||||
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
|
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
|
||||||
long originalPosition = this.inputStream.getPos();
|
long originalPosition = this.inputStream.getPos();
|
||||||
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
|
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
|
||||||
LOG.trace("Reached end of expected edits area at offset " + originalPosition);
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
WALKey.Builder builder = WALKey.newBuilder();
|
WALKey.Builder builder = WALKey.newBuilder();
|
||||||
@ -373,10 +371,8 @@ public class ProtobufLogReader extends ReaderBase {
|
|||||||
WALKey walKey = builder.build();
|
WALKey walKey = builder.build();
|
||||||
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
||||||
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
||||||
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +
|
this.inputStream.getPos());
|
||||||
this.inputStream.getPos());
|
|
||||||
}
|
|
||||||
seekOnFs(originalPosition);
|
seekOnFs(originalPosition);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -393,9 +389,7 @@ public class ProtobufLogReader extends ReaderBase {
|
|||||||
try {
|
try {
|
||||||
posAfterStr = this.inputStream.getPos() + "";
|
posAfterStr = this.inputStream.getPos() + "";
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Error getting pos for error message - ignoring", t);
|
||||||
LOG.trace("Error getting pos for error message - ignoring", t);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
|
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
|
||||||
+ posBefore + " and read up to " + posAfterStr;
|
+ posBefore + " and read up to " + posAfterStr;
|
||||||
@ -412,27 +406,21 @@ public class ProtobufLogReader extends ReaderBase {
|
|||||||
} catch (EOFException eof) {
|
} catch (EOFException eof) {
|
||||||
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
|
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
|
||||||
if (originalPosition < 0) {
|
if (originalPosition < 0) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.warn("Encountered a malformed edit, but can't seek back to last good position "
|
||||||
LOG.trace("Encountered a malformed edit, but can't seek back to last good position "
|
+ "because originalPosition is negative. last offset={}",
|
||||||
+ "because originalPosition is negative. last offset="
|
this.inputStream.getPos(), eof);
|
||||||
+ this.inputStream.getPos(), eof);
|
|
||||||
}
|
|
||||||
throw eof;
|
throw eof;
|
||||||
}
|
}
|
||||||
// If stuck at the same place and we got and exception, lets go back at the beginning.
|
// If stuck at the same place and we got and exception, lets go back at the beginning.
|
||||||
if (inputStream.getPos() == originalPosition && resetPosition) {
|
if (inputStream.getPos() == originalPosition && resetPosition) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
|
||||||
LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since "
|
+ "current position and original position match at {}", originalPosition);
|
||||||
+ "current position and original position match at " + originalPosition);
|
|
||||||
}
|
|
||||||
seekOnFs(0);
|
seekOnFs(0);
|
||||||
} else {
|
} else {
|
||||||
// Else restore our position to original location in hope that next time through we will
|
// Else restore our position to original location in hope that next time through we will
|
||||||
// read successfully.
|
// read successfully.
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.warn("Encountered a malformed edit, seeking back to last good position in file, "
|
||||||
LOG.trace("Encountered a malformed edit, seeking back to last good position in file, "
|
+ "from {} to {}", inputStream.getPos(), originalPosition, eof);
|
||||||
+ "from " + inputStream.getPos()+" to " + originalPosition, eof);
|
|
||||||
}
|
|
||||||
seekOnFs(originalPosition);
|
seekOnFs(originalPosition);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user