HBASE-15212 RRCServer should enforce max request size
This commit is contained in:
parent
d07230a759
commit
b697f53be3
@ -262,15 +262,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||||||
|
|
||||||
protected HBaseRPCErrorHandler errorHandler = null;
|
protected HBaseRPCErrorHandler errorHandler = null;
|
||||||
|
|
||||||
|
static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
|
||||||
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
|
private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
|
||||||
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
|
private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
|
||||||
|
|
||||||
/** Default value for above params */
|
/** Default value for above params */
|
||||||
|
private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
|
||||||
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
|
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
|
||||||
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
|
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
|
||||||
|
|
||||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private final int maxRequestSize;
|
||||||
private final int warnResponseTime;
|
private final int warnResponseTime;
|
||||||
private final int warnResponseSize;
|
private final int warnResponseSize;
|
||||||
private final Server server;
|
private final Server server;
|
||||||
@ -1225,6 +1228,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||||||
protected String hostAddress;
|
protected String hostAddress;
|
||||||
protected int remotePort;
|
protected int remotePort;
|
||||||
ConnectionHeader connectionHeader;
|
ConnectionHeader connectionHeader;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Codec the client asked use.
|
* Codec the client asked use.
|
||||||
*/
|
*/
|
||||||
@ -1609,11 +1613,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (dataLength < 0) { // A data length of zero is legal.
|
if (dataLength < 0) { // A data length of zero is legal.
|
||||||
throw new IllegalArgumentException("Unexpected data length "
|
throw new DoNotRetryIOException("Unexpected data length "
|
||||||
+ dataLength + "!! from " + getHostAddress());
|
+ dataLength + "!! from " + getHostAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: check dataLength against some limit so that the client cannot OOM the server
|
if (dataLength > maxRequestSize) {
|
||||||
|
throw new DoNotRetryIOException("RPC data length of " + dataLength + " received from "
|
||||||
|
+ getHostAddress() + " is greater than max allowed " + maxRequestSize + ". Set \""
|
||||||
|
+ MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)");
|
||||||
|
}
|
||||||
|
|
||||||
data = ByteBuffer.allocate(dataLength);
|
data = ByteBuffer.allocate(dataLength);
|
||||||
|
|
||||||
// Increment the rpc count. This counter will be decreased when we write
|
// Increment the rpc count. This counter will be decreased when we write
|
||||||
@ -2062,6 +2071,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||||||
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
|
||||||
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
|
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
|
||||||
|
|
||||||
|
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
|
||||||
|
|
||||||
// Start the listener here and let it bind to the port
|
// Start the listener here and let it bind to the port
|
||||||
listener = new Listener(name);
|
listener = new Listener(name);
|
||||||
this.port = listener.getAddress().getPort();
|
this.port = listener.getAddress().getPort();
|
||||||
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.http.ConnectionClosedException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -137,13 +138,17 @@ public abstract class AbstractTestIPC {
|
|||||||
static class TestRpcServer extends RpcServer {
|
static class TestRpcServer extends RpcServer {
|
||||||
|
|
||||||
TestRpcServer() throws IOException {
|
TestRpcServer() throws IOException {
|
||||||
this(new FifoRpcScheduler(CONF, 1));
|
this(new FifoRpcScheduler(CONF, 1), CONF);
|
||||||
}
|
}
|
||||||
|
|
||||||
TestRpcServer(RpcScheduler scheduler) throws IOException {
|
TestRpcServer(Configuration conf) throws IOException {
|
||||||
|
this(new FifoRpcScheduler(conf, 1), conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
|
||||||
super(null, "testRpcServer", Lists
|
super(null, "testRpcServer", Lists
|
||||||
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
|
.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
|
||||||
"localhost", 0), CONF, scheduler);
|
"localhost", 0), conf, scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -267,7 +272,7 @@ public abstract class AbstractTestIPC {
|
|||||||
@Test
|
@Test
|
||||||
public void testRpcScheduler() throws IOException, InterruptedException {
|
public void testRpcScheduler() throws IOException, InterruptedException {
|
||||||
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
|
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
|
||||||
RpcServer rpcServer = new TestRpcServer(scheduler);
|
RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
|
||||||
verify(scheduler).init((RpcScheduler.Context) anyObject());
|
verify(scheduler).init((RpcScheduler.Context) anyObject());
|
||||||
AbstractRpcClient client = createRpcClient(CONF);
|
AbstractRpcClient client = createRpcClient(CONF);
|
||||||
try {
|
try {
|
||||||
@ -292,6 +297,37 @@ public abstract class AbstractTestIPC {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Tests that the rpc scheduler is called when requests arrive. */
|
||||||
|
@Test
|
||||||
|
public void testRpcMaxRequestSize() throws IOException, InterruptedException {
|
||||||
|
Configuration conf = new Configuration(CONF);
|
||||||
|
conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
|
||||||
|
RpcServer rpcServer = new TestRpcServer(conf);
|
||||||
|
AbstractRpcClient client = createRpcClient(conf);
|
||||||
|
try {
|
||||||
|
rpcServer.start();
|
||||||
|
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
|
||||||
|
// set total RPC size bigger than 100 bytes
|
||||||
|
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello.hello.hello.hello."
|
||||||
|
+ "hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello").build();
|
||||||
|
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||||
|
if (address == null) {
|
||||||
|
throw new IOException("Listener channel is closed");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
client.call(new PayloadCarryingRpcController(
|
||||||
|
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
|
||||||
|
md.getOutputType().toProto(), User.getCurrent(), address,
|
||||||
|
new MetricsConnection.CallStats());
|
||||||
|
fail("RPC should have failed because it exceeds max request size");
|
||||||
|
} catch(ConnectionClosingException | ConnectionClosedException ex) {
|
||||||
|
// pass
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
rpcServer.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instance of RpcServer that echoes client hostAddress back to client
|
* Instance of RpcServer that echoes client hostAddress back to client
|
||||||
*/
|
*/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user