From 890c89b5c0651fa74bec0d479c019e13e1ffb87e Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 28 Feb 2023 10:07:14 +0800 Subject: [PATCH] HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes (#5059) Signed-off-by: Viraj Jasani (cherry picked from commit d1fede72c340f6eeb145410e88b089e636fd6f5e) --- .../hbase/shaded/protobuf/ProtobufUtil.java | 50 ++++++++ .../shaded/protobuf/TestProtobufUtil.java | 19 +++ .../regionserver/wal/ProtobufLogReader.java | 110 +++++++++++------- .../wal/AbstractTestProtobufLog.java | 5 + 4 files changed, 145 insertions(+), 39 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 2584216aa09..5280391fe4b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.shaded.protobuf; import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC; import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Constructor; @@ -130,6 +131,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.Parser; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.Service; @@ -3535,4 +3537,52 @@ public final class ProtobufUtil { .setLogMessage(balancerRejectionsRequest.toByteString()).build(); } + /** + * Check whether this IPBE indicates EOF or not. + *

+ * We will check the exception message, if it is likely the one of + * InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not. + */ + public static boolean isEOF(InvalidProtocolBufferException e) { + return e.getMessage().contains("input has been truncated"); + } + + /** + * This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not + * determine whether there are enough bytes in stream, i.e, the available method does not have a + * valid return value, we will try to read all the bytes to a byte array first, and then parse the + * pb message with {@link Parser#parseFrom(byte[])} instead of call + * {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are + * not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any + * errors but just leave us a partial PB message. + * @return The PB message if we can parse it successfully, otherwise there will always be an + * exception thrown, will never return {@code null}. + */ + public static T parseDelimitedFrom(InputStream in, Parser parser) + throws IOException { + int firstByte = in.read(); + if (firstByte < 0) { + throw new EOFException("EOF while reading message size"); + } + int size = CodedInputStream.readRawVarint32(firstByte, in); + int available = in.available(); + if (available > 0) { + if (available < size) { + throw new EOFException("Available bytes not enough for parsing PB message, expect at least " + + size + " bytes, but only " + available + " bytes available"); + } + // this piece of code is copied from GeneratedMessageV3.parseFrom + try { + return parser.parseFrom(ByteStreams.limit(in, size)); + } catch (InvalidProtocolBufferException e) { + throw e.unwrapIOException(); + } + } else { + // this usually means the stream does not have a proper available implementation, let's read + // the content to an byte array before parsing. + byte[] bytes = new byte[size]; + ByteStreams.readFully(in, bytes); + return parser.parseFrom(bytes); + } + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java index b27d832ee8c..fc442b8998d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -53,6 +54,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.Any; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; @@ -574,4 +576,21 @@ public class TestProtobufUtil { List decodedTags = PrivateCellUtil.getTags(decodedCell); assertEquals(0, decodedTags.size()); } + + /** + * Used to confirm that we only consider truncatedMessage as EOF + */ + @Test + public void testIsEOF() throws Exception { + for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) { + if ( + method.getParameterCount() == 0 + && method.getReturnType() == InvalidProtocolBufferException.class + ) { + method.setAccessible(true); + InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null); + assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e)); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index dd65b6a12d0..4ee88102588 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -35,8 +35,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; -import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -336,60 +334,46 @@ public class ProtobufLogReader extends ReaderBase { @Override protected boolean readNext(Entry entry) throws IOException { + resetCompression = false; // OriginalPosition might be < 0 on local fs; if so, it is useless to us. long originalPosition = this.inputStream.getPos(); if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) { LOG.trace("Reached end of expected edits area at offset {}", originalPosition); return false; } - WALKey.Builder builder = WALKey.newBuilder(); - long size = 0; boolean resetPosition = false; - // by default, we should reset the compression when seeking back after reading something - resetCompression = true; try { - long available = -1; + WALKey walKey; try { - int firstByte = this.inputStream.read(); - if (firstByte == -1) { - throw new EOFException(); + walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser()); + } catch (InvalidProtocolBufferException e) { + if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) { + // only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer + resetPosition = true; + throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e); + } else { + throw e; } - size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); - // available may be < 0 on local fs for instance. If so, can't depend on it. - available = this.inputStream.available(); - if (available > 0 && available < size) { - // if we quit here, we have just read the length, no actual data yet, which means we - // haven't put anything into the compression dictionary yet, so when seeking back to the - // last good position, we do not need to reset compression context. - // This is very useful for saving the extra effort for reconstructing the compression - // dictionary, where we need to read from the beginning instead of just seek to the - // position, as DFSInputStream implement the available method, so in most cases we will - // reach here if there are not enough data. - resetCompression = false; - throw new EOFException("Available stream not enough for edit, " - + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= " - + size + " at offset = " + this.inputStream.getPos()); - } - ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size); - } catch (InvalidProtocolBufferException ipbe) { - resetPosition = true; - throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" - + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize=" - + size + ", currentAvailable=" + available).initCause(ipbe); + } catch (EOFException e) { + // append more detailed information + throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition=" + + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e); } - if (!builder.isInitialized()) { - // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. - // If we can get the KV count, we could, theoretically, try to get next record. - throw new EOFException("Partial PB while reading WAL, " - + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos()); - } - WALKey walKey = builder.build(); entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}", this.inputStream.getPos()); return true; } + // Starting from here, we will start to read cells, which will change the content in + // compression dictionary, so if we fail in the below operations, when resetting, we also need + // to clear the compression context, and read from the beginning to reconstruct the + // compression dictionary, instead of seeking to the position directly. + // This is very useful for saving the extra effort for reconstructing the compression + // dictionary, as DFSInputStream implement the available method, so in most cases we will + // not reach here if there are not enough data. + resetCompression = true; int expectedCells = walKey.getFollowingKvCount(); long posBefore = this.inputStream.getPos(); try { @@ -468,6 +452,54 @@ public class ProtobufLogReader extends ReaderBase { return null; } + /** + * This is used to determine whether we have already reached the WALTrailer. As the size and magic + * are at the end of the WAL file, it is possible that these two options are missing while + * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we + * will try to decode it as WALKey and we will fail but the error could vary as it is parsing + * WALTrailer actually. + * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done + */ + private boolean isWALTrailer(long startPosition) throws IOException { + // We have nothing in the WALTrailer PB message now so its size is just a int length size and a + // magic at the end + int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT; + if (fileLength - startPosition >= trailerSize) { + // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer. + // We also test for == here because if this is a valid trailer, we can read it while opening + // the reader so we should not reach here + return false; + } + inputStream.seek(startPosition); + for (int i = 0; i < 4; i++) { + int r = inputStream.read(); + if (r == -1) { + // we have reached EOF while reading the length, and all bytes read are 0, so we assume this + // is a partial trailer + return true; + } + if (r != 0) { + // the length is not 0, should not be a trailer + return false; + } + } + for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) { + int r = inputStream.read(); + if (r == -1) { + // we have reached EOF while reading the magic, and all bytes read are matched, so we assume + // this is a partial trailer + return true; + } + if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) { + // does not match magic, should not be a trailer + return false; + } + } + // in fact we should not reach here, as this means the trailer bytes are all matched and + // complete, then we should not call this method... + return true; + } + @Override protected void seekOnFs(long pos) throws IOException { this.inputStream.seek(pos); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java index fac8970877b..00e454dd308 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java @@ -52,6 +52,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; + /** * WAL tests that can be reused across providers. */ @@ -110,6 +112,9 @@ public abstract class AbstractTestProtobufLog { */ @Test public void testWALTrailer() throws IOException { + // make sure that the size for WALTrailer is 0, we need this assumption when reading partial + // WALTrailer + assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize()); // read With trailer. doRead(true); // read without trailer