HBASE-13230 [mob] reads hang when trying to read rows with large mobs (>10MB)
This commit is contained in:
parent
8c1edeb2b8
commit
aedd0ebe9b
|
@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
|
@ -92,7 +93,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
|
|||
// Call may be null because it may have timedout and been cleaned up on this side already
|
||||
if (call.responseDefaultType != null) {
|
||||
Message.Builder builder = call.responseDefaultType.newBuilderForType();
|
||||
builder.mergeDelimitedFrom(in);
|
||||
ProtobufUtil.mergeDelimitedFrom(builder, in);
|
||||
value = builder.build();
|
||||
}
|
||||
CellScanner cellBlockScanner = null;
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
|
||||
|
@ -950,7 +951,7 @@ public class RpcClientImpl extends AbstractRpcClient {
|
|||
Message value = null;
|
||||
if (call.responseDefaultType != null) {
|
||||
Builder builder = call.responseDefaultType.newBuilderForType();
|
||||
builder.mergeDelimitedFrom(in);
|
||||
ProtobufUtil.mergeDelimitedFrom(builder, in);
|
||||
value = builder.build();
|
||||
}
|
||||
CellScanner cellBlockScanner = null;
|
||||
|
|
|
@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.protobuf;
|
|||
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
|
@ -37,6 +39,7 @@ import java.util.Map.Entry;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.protobuf.*;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -149,14 +152,6 @@ import org.apache.hadoop.security.token.Token;
|
|||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Parser;
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
/**
|
||||
* Protobufs utility.
|
||||
|
@ -2994,4 +2989,86 @@ public final class ProtobufUtil {
|
|||
|
||||
return desc.build();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* This version of protobuf's mergeDelimitedFrom avoid the hard-coded 64MB limit for decoding
|
||||
* buffers
|
||||
* @param builder current message builder
|
||||
* @param in Inputsream with delimited protobuf data
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void mergeDelimitedFrom(Message.Builder builder, InputStream in) throws IOException {
|
||||
// This used to be builder.mergeDelimitedFrom(in);
|
||||
// but is replaced to allow us to bump the protobuf size limit.
|
||||
final int firstByte = in.read();
|
||||
if (firstByte == -1) {
|
||||
// bail out. (was return false;)
|
||||
} else {
|
||||
final int size = CodedInputStream.readRawVarint32(firstByte, in);
|
||||
final InputStream limitedInput = new LimitedInputStream(in, size);
|
||||
final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput);
|
||||
codedInput.setSizeLimit(size);
|
||||
builder.mergeFrom(codedInput);
|
||||
codedInput.checkLastTagWas(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is cut and paste from protobuf's package private AbstractMessageLite.
|
||||
*
|
||||
* An InputStream implementations which reads from some other InputStream
|
||||
* but is limited to a particular number of bytes. Used by
|
||||
* mergeDelimitedFrom(). This is intentionally package-private so that
|
||||
* UnknownFieldSet can share it.
|
||||
*/
|
||||
static final class LimitedInputStream extends FilterInputStream {
|
||||
private int limit;
|
||||
|
||||
LimitedInputStream(InputStream in, int limit) {
|
||||
super(in);
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
return Math.min(super.available(), limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if (limit <= 0) {
|
||||
return -1;
|
||||
}
|
||||
final int result = super.read();
|
||||
if (result >= 0) {
|
||||
--limit;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(final byte[] b, final int off, int len)
|
||||
throws IOException {
|
||||
if (limit <= 0) {
|
||||
return -1;
|
||||
}
|
||||
len = Math.min(len, limit);
|
||||
final int result = super.read(b, off, len);
|
||||
if (result >= 0) {
|
||||
limit -= result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(final long n) throws IOException {
|
||||
final long result = super.skip(Math.min(n, limit));
|
||||
if (result >= 0) {
|
||||
limit -= result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,13 +31,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -70,6 +66,7 @@ public class TestMobStoreScanner {
|
|||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100*1024*1024);
|
||||
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
@ -136,6 +133,26 @@ public class TestMobStoreScanner {
|
|||
testGetFromArchive(true);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testGetMassive() throws Exception {
|
||||
String TN = "testGetMassive";
|
||||
setUp(defaultThreshold, TN);
|
||||
|
||||
// Put some data 5 10, 15, 20 mb ok (this would be right below protobuf default max size of 64MB.
|
||||
// 25, 30, 40 fail. these is above protobuf max size of 64MB
|
||||
byte[] bigValue = new byte[25*1024*1024];
|
||||
|
||||
Put put = new Put(row1);
|
||||
put.add(family, qf1, bigValue);
|
||||
put.add(family, qf2, bigValue);
|
||||
put.add(family, qf3, bigValue);
|
||||
table.put(put);
|
||||
|
||||
Get g = new Get(row1);
|
||||
Result r = table.get(g);
|
||||
// should not have blown up.
|
||||
}
|
||||
|
||||
public void testGetFromFiles(boolean reversed) throws Exception {
|
||||
String TN = "testGetFromFiles" + reversed;
|
||||
setUp(defaultThreshold, TN);
|
||||
|
|
Loading…
Reference in New Issue