HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1498737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-07-01 23:17:13 +00:00
parent 6735b70b4a
commit 805e9b5b6d
4 changed files with 52 additions and 7 deletions

View File

@ -440,6 +440,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9619 Mark stability of .proto files (sanjay Radia) HADOOP-9619 Mark stability of .proto files (sanjay Radia)
HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick
McCabe)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

View File

@ -65,6 +65,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */ /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
public static final String IPC_MAXIMUM_DATA_LENGTH =
"ipc.maximum.data.length";
public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
/** How many calls per handler are allowed in the queue. */ /** How many calls per handler are allowed in the queue. */
public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
"ipc.server.handler.queue.size"; "ipc.server.handler.queue.size";

View File

@ -343,6 +343,7 @@ public abstract class Server {
private int maxQueueSize; private int maxQueueSize;
private final int maxRespSize; private final int maxRespSize;
private int socketSendBufferSize; private int socketSendBufferSize;
private final int maxDataLength;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
volatile private boolean running = true; // true while server runs volatile private boolean running = true; // true while server runs
@ -1381,6 +1382,21 @@ public abstract class Server {
} }
} }
private void checkDataLength(int dataLength) throws IOException {
if (dataLength < 0) {
String error = "Unexpected data length " + dataLength +
"!! from " + getHostAddress();
LOG.warn(error);
throw new IOException(error);
} else if (dataLength > maxDataLength) {
String error = "Requested data length " + dataLength +
" is longer than maximum configured RPC length " +
maxDataLength + ". RPC came from " + getHostAddress();
LOG.warn(error);
throw new IOException(error);
}
}
public int readAndProcess() throws IOException, InterruptedException { public int readAndProcess() throws IOException, InterruptedException {
while (true) { while (true) {
/* Read at most one RPC. If the header is not read completely yet /* Read at most one RPC. If the header is not read completely yet
@ -1442,11 +1458,7 @@ public abstract class Server {
dataLengthBuffer.clear(); dataLengthBuffer.clear();
return 0; // ping message return 0; // ping message
} }
checkDataLength(dataLength);
if (dataLength < 0) {
LOG.warn("Unexpected data length " + dataLength + "!! from " +
getHostAddress());
}
data = ByteBuffer.allocate(dataLength); data = ByteBuffer.allocate(dataLength);
} }
@ -1981,6 +1993,8 @@ public abstract class Server {
this.rpcRequestClass = rpcRequestClass; this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount; this.handlerCount = handlerCount;
this.socketSendBufferSize = 0; this.socketSendBufferSize = 0;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) { if (queueSizePerHandler != -1) {
this.maxQueueSize = queueSizePerHandler; this.maxQueueSize = queueSizePerHandler;
} else { } else {

View File

@ -24,7 +24,9 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
@ -113,6 +115,7 @@ public class TestProtoBufRpc {
@Before @Before
public void setUp() throws IOException { // Setup server for both protocols public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration(); conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
// Set RPC engine to protobuf RPC engine // Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
@ -230,4 +233,24 @@ public class TestProtoBufRpc {
re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION)); re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION));
} }
} }
@Test(timeout=6000)
public void testExtraLongRpc() throws Exception {
TestRpcService2 client = getClient2();
final String shortString = StringUtils.repeat("X", 4);
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
.setMessage(shortString).build();
// short message goes through
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
final String longString = StringUtils.repeat("X", 4096);
echoRequest = EchoRequestProto.newBuilder()
.setMessage(longString).build();
try {
echoResponse = client.echo2(null, echoRequest);
Assert.fail("expected extra-long RPC to fail");
} catch (ServiceException se) {
// expected
}
}
} }