HBASE-15212 RRCServer should enforce max request size

This commit is contained in:
Enis Soztutar 2016-03-22 16:23:15 -07:00
parent 8af9ed7433
commit 3f3613a234
2 changed files with 53 additions and 6 deletions

View File

@ -260,15 +260,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected HBaseRPCErrorHandler errorHandler = null;
static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
/** Default value for above params */
private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
private static final ObjectMapper MAPPER = new ObjectMapper();
private final int maxRequestSize;
private final int warnResponseTime;
private final int warnResponseSize;
private final Server server;
@ -1239,6 +1242,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected String hostAddress;
protected int remotePort;
ConnectionHeader connectionHeader;
/**
* Codec the client asked use.
*/
@ -1623,11 +1627,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
if (dataLength < 0) { // A data length of zero is legal.
throw new IllegalArgumentException("Unexpected data length "
throw new DoNotRetryIOException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
// TODO: check dataLength against some limit so that the client cannot OOM the server
if (dataLength > maxRequestSize) {
throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
+ getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
+ MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
}
data = ByteBuffer.allocate(dataLength);
// Increment the rpc count. This counter will be decreased when we write
@ -2071,6 +2080,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
// Start the listener here and let it bind to the port
listener = new Listener(name);
this.port = listener.getAddress().getPort();

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.apache.http.ConnectionClosedException;
import org.junit.Assert;
import org.junit.Test;
@ -137,13 +138,17 @@ public abstract class AbstractTestIPC {
static class TestRpcServer extends RpcServer {
TestRpcServer() throws IOException {
this(new FifoRpcScheduler(CONF, 1));
this(new FifoRpcScheduler(CONF, 1), CONF);
}
TestRpcServer(RpcScheduler scheduler) throws IOException {
TestRpcServer(Configuration conf) throws IOException {
this(new FifoRpcScheduler(conf, 1), conf);
}
TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
super(null, "testRpcServer", Lists
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
"localhost", 0), CONF, scheduler);
"localhost", 0), conf, scheduler);
}
@Override
@ -267,7 +272,7 @@ public abstract class AbstractTestIPC {
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler);
RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
verify(scheduler).init((RpcScheduler.Context) anyObject());
AbstractRpcClient client = createRpcClient(CONF);
try {
@ -292,6 +297,37 @@ public abstract class AbstractTestIPC {
}
}
/** Tests that the rpc scheduler is called when requests arrive. */
@Test
public void testRpcMaxRequestSize() throws IOException, InterruptedException {
Configuration conf = new Configuration(CONF);
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
RpcServer rpcServer = new TestRpcServer(conf);
AbstractRpcClient client = createRpcClient(conf);
try {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
// set total RPC size bigger than 100 bytes
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello.hello.hello.hello."
+ "hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello").build();
InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
try {
client.call(new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
fail("RPC should have failed because it exceeds max request size");
} catch(ConnectionClosingException | ConnectionClosedException ex) {
// pass
}
} finally {
rpcServer.stop();
}
}
/**
* Instance of RpcServer that echoes client hostAddress back to client
*/