HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
8d70786a2d
commit
d1fede72c3
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.shaded.protobuf;
|
|||
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.reflect.Constructor;
|
||||
|
@ -137,6 +138,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
|
||||
|
@ -3700,4 +3702,52 @@ public final class ProtobufUtil {
|
|||
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether this IPBE indicates EOF or not.
|
||||
* <p/>
|
||||
* We will check the exception message, if it is likely the one of
|
||||
* InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
|
||||
*/
|
||||
public static boolean isEOF(InvalidProtocolBufferException e) {
|
||||
return e.getMessage().contains("input has been truncated");
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
|
||||
* determine whether there are enough bytes in stream, i.e, the available method does not have a
|
||||
* valid return value, we will try to read all the bytes to a byte array first, and then parse the
|
||||
* pb message with {@link Parser#parseFrom(byte[])} instead of call
|
||||
* {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
|
||||
* not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
|
||||
* errors but just leave us a partial PB message.
|
||||
* @return The PB message if we can parse it successfully, otherwise there will always be an
|
||||
* exception thrown, will never return {@code null}.
|
||||
*/
|
||||
public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T> parser)
|
||||
throws IOException {
|
||||
int firstByte = in.read();
|
||||
if (firstByte < 0) {
|
||||
throw new EOFException("EOF while reading message size");
|
||||
}
|
||||
int size = CodedInputStream.readRawVarint32(firstByte, in);
|
||||
int available = in.available();
|
||||
if (available > 0) {
|
||||
if (available < size) {
|
||||
throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
|
||||
+ size + " bytes, but only " + available + " bytes available");
|
||||
}
|
||||
// this piece of code is copied from GeneratedMessageV3.parseFrom
|
||||
try {
|
||||
return parser.parseFrom(ByteStreams.limit(in, size));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw e.unwrapIOException();
|
||||
}
|
||||
} else {
|
||||
// this usually means the stream does not have a proper available implementation, let's read
|
||||
// the content to an byte array before parsing.
|
||||
byte[] bytes = new byte[size];
|
||||
ByteStreams.readFully(in, bytes);
|
||||
return parser.parseFrom(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
@ -574,4 +576,21 @@ public class TestProtobufUtil {
|
|||
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
|
||||
assertEquals(0, decodedTags.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to confirm that we only consider truncatedMessage as EOF
|
||||
*/
|
||||
@Test
|
||||
public void testIsEOF() throws Exception {
|
||||
for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) {
|
||||
if (
|
||||
method.getParameterCount() == 0
|
||||
&& method.getReturnType() == InvalidProtocolBufferException.class
|
||||
) {
|
||||
method.setAccessible(true);
|
||||
InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null);
|
||||
assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,8 +36,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
@ -358,60 +356,46 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
|
||||
@Override
|
||||
protected boolean readNext(Entry entry) throws IOException {
|
||||
resetCompression = false;
|
||||
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
|
||||
long originalPosition = this.inputStream.getPos();
|
||||
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
|
||||
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
|
||||
return false;
|
||||
}
|
||||
WALKey.Builder builder = WALKey.newBuilder();
|
||||
long size = 0;
|
||||
boolean resetPosition = false;
|
||||
// by default, we should reset the compression when seeking back after reading something
|
||||
resetCompression = true;
|
||||
try {
|
||||
long available = -1;
|
||||
WALKey walKey;
|
||||
try {
|
||||
int firstByte = this.inputStream.read();
|
||||
if (firstByte == -1) {
|
||||
throw new EOFException();
|
||||
}
|
||||
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
|
||||
// available may be < 0 on local fs for instance. If so, can't depend on it.
|
||||
available = this.inputStream.available();
|
||||
if (available > 0 && available < size) {
|
||||
// if we quit here, we have just read the length, no actual data yet, which means we
|
||||
// haven't put anything into the compression dictionary yet, so when seeking back to the
|
||||
// last good position, we do not need to reset compression context.
|
||||
// This is very useful for saving the extra effort for reconstructing the compression
|
||||
// dictionary, where we need to read from the beginning instead of just seek to the
|
||||
// position, as DFSInputStream implement the available method, so in most cases we will
|
||||
// reach here if there are not enough data.
|
||||
resetCompression = false;
|
||||
throw new EOFException("Available stream not enough for edit, "
|
||||
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
|
||||
+ size + " at offset = " + this.inputStream.getPos());
|
||||
}
|
||||
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
|
||||
} catch (InvalidProtocolBufferException ipbe) {
|
||||
walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
|
||||
// only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
|
||||
resetPosition = true;
|
||||
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
|
||||
+ originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
|
||||
+ size + ", currentAvailable=" + available).initCause(ipbe);
|
||||
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
if (!builder.isInitialized()) {
|
||||
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
|
||||
// If we can get the KV count, we could, theoretically, try to get next record.
|
||||
throw new EOFException("Partial PB while reading WAL, "
|
||||
+ "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
|
||||
} catch (EOFException e) {
|
||||
// append more detailed information
|
||||
throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
|
||||
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
|
||||
}
|
||||
WALKey walKey = builder.build();
|
||||
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
||||
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
||||
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
||||
this.inputStream.getPos());
|
||||
return true;
|
||||
}
|
||||
// Starting from here, we will start to read cells, which will change the content in
|
||||
// compression dictionary, so if we fail in the below operations, when resetting, we also need
|
||||
// to clear the compression context, and read from the beginning to reconstruct the
|
||||
// compression dictionary, instead of seeking to the position directly.
|
||||
// This is very useful for saving the extra effort for reconstructing the compression
|
||||
// dictionary, as DFSInputStream implement the available method, so in most cases we will
|
||||
// not reach here if there are not enough data.
|
||||
resetCompression = true;
|
||||
int expectedCells = walKey.getFollowingKvCount();
|
||||
long posBefore = this.inputStream.getPos();
|
||||
try {
|
||||
|
@ -490,6 +474,54 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used to determine whether we have already reached the WALTrailer. As the size and magic
|
||||
* are at the end of the WAL file, it is possible that these two options are missing while
|
||||
* writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
|
||||
* will try to decode it as WALKey and we will fail but the error could vary as it is parsing
|
||||
* WALTrailer actually.
|
||||
* @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
|
||||
*/
|
||||
private boolean isWALTrailer(long startPosition) throws IOException {
|
||||
// We have nothing in the WALTrailer PB message now so its size is just a int length size and a
|
||||
// magic at the end
|
||||
int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
|
||||
if (fileLength - startPosition >= trailerSize) {
|
||||
// We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
|
||||
// We also test for == here because if this is a valid trailer, we can read it while opening
|
||||
// the reader so we should not reach here
|
||||
return false;
|
||||
}
|
||||
inputStream.seek(startPosition);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
int r = inputStream.read();
|
||||
if (r == -1) {
|
||||
// we have reached EOF while reading the length, and all bytes read are 0, so we assume this
|
||||
// is a partial trailer
|
||||
return true;
|
||||
}
|
||||
if (r != 0) {
|
||||
// the length is not 0, should not be a trailer
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
|
||||
int r = inputStream.read();
|
||||
if (r == -1) {
|
||||
// we have reached EOF while reading the magic, and all bytes read are matched, so we assume
|
||||
// this is a partial trailer
|
||||
return true;
|
||||
}
|
||||
if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
|
||||
// does not match magic, should not be a trailer
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// in fact we should not reach here, as this means the trailer bytes are all matched and
|
||||
// complete, then we should not call this method...
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void seekOnFs(long pos) throws IOException {
|
||||
this.inputStream.seek(pos);
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -35,6 +37,8 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
|
||||
|
||||
/**
|
||||
* WAL tests that can be reused across providers.
|
||||
*/
|
||||
|
@ -89,6 +93,9 @@ public abstract class AbstractTestProtobufLog {
|
|||
*/
|
||||
@Test
|
||||
public void testWALTrailer() throws IOException {
|
||||
// make sure that the size for WALTrailer is 0, we need this assumption when reading partial
|
||||
// WALTrailer
|
||||
assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
|
||||
// read With trailer.
|
||||
doRead(true);
|
||||
// read without trailer
|
||||
|
|
Loading…
Reference in New Issue