HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick McCabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1498740 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3121145b9a
commit
dd25abb825
|
@ -171,6 +171,9 @@ Release 2.1.0-beta - 2013-07-02
|
||||||
|
|
||||||
HADOOP-9619 Mark stability of .proto files (sanjay Radia)
|
HADOOP-9619 Mark stability of .proto files (sanjay Radia)
|
||||||
|
|
||||||
|
HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick
|
||||||
|
McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
||||||
|
|
|
@ -64,6 +64,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||||
"ipc.server.read.threadpool.size";
|
"ipc.server.read.threadpool.size";
|
||||||
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
|
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
|
||||||
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
|
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
|
||||||
|
|
||||||
|
public static final String IPC_MAXIMUM_DATA_LENGTH =
|
||||||
|
"ipc.maximum.data.length";
|
||||||
|
|
||||||
|
public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
|
||||||
|
|
||||||
/** How many calls per handler are allowed in the queue. */
|
/** How many calls per handler are allowed in the queue. */
|
||||||
public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
|
public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
|
||||||
|
|
|
@ -340,6 +340,7 @@ public abstract class Server {
|
||||||
private int maxQueueSize;
|
private int maxQueueSize;
|
||||||
private final int maxRespSize;
|
private final int maxRespSize;
|
||||||
private int socketSendBufferSize;
|
private int socketSendBufferSize;
|
||||||
|
private final int maxDataLength;
|
||||||
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||||
|
|
||||||
volatile private boolean running = true; // true while server runs
|
volatile private boolean running = true; // true while server runs
|
||||||
|
@ -1377,7 +1378,22 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkDataLength(int dataLength) throws IOException {
|
||||||
|
if (dataLength < 0) {
|
||||||
|
String error = "Unexpected data length " + dataLength +
|
||||||
|
"!! from " + getHostAddress();
|
||||||
|
LOG.warn(error);
|
||||||
|
throw new IOException(error);
|
||||||
|
} else if (dataLength > maxDataLength) {
|
||||||
|
String error = "Requested data length " + dataLength +
|
||||||
|
" is longer than maximum configured RPC length " +
|
||||||
|
maxDataLength + ". RPC came from " + getHostAddress();
|
||||||
|
LOG.warn(error);
|
||||||
|
throw new IOException(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public int readAndProcess() throws IOException, InterruptedException {
|
public int readAndProcess() throws IOException, InterruptedException {
|
||||||
while (true) {
|
while (true) {
|
||||||
/* Read at most one RPC. If the header is not read completely yet
|
/* Read at most one RPC. If the header is not read completely yet
|
||||||
|
@ -1439,11 +1455,7 @@ public abstract class Server {
|
||||||
dataLengthBuffer.clear();
|
dataLengthBuffer.clear();
|
||||||
return 0; // ping message
|
return 0; // ping message
|
||||||
}
|
}
|
||||||
|
checkDataLength(dataLength);
|
||||||
if (dataLength < 0) {
|
|
||||||
LOG.warn("Unexpected data length " + dataLength + "!! from " +
|
|
||||||
getHostAddress());
|
|
||||||
}
|
|
||||||
data = ByteBuffer.allocate(dataLength);
|
data = ByteBuffer.allocate(dataLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1978,6 +1990,8 @@ public abstract class Server {
|
||||||
this.rpcRequestClass = rpcRequestClass;
|
this.rpcRequestClass = rpcRequestClass;
|
||||||
this.handlerCount = handlerCount;
|
this.handlerCount = handlerCount;
|
||||||
this.socketSendBufferSize = 0;
|
this.socketSendBufferSize = 0;
|
||||||
|
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
|
||||||
|
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
|
||||||
if (queueSizePerHandler != -1) {
|
if (queueSizePerHandler != -1) {
|
||||||
this.maxQueueSize = queueSizePerHandler;
|
this.maxQueueSize = queueSizePerHandler;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -24,7 +24,9 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||||
|
@ -113,6 +115,7 @@ public class TestProtoBufRpc {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException { // Setup server for both protocols
|
public void setUp() throws IOException { // Setup server for both protocols
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
|
||||||
// Set RPC engine to protobuf RPC engine
|
// Set RPC engine to protobuf RPC engine
|
||||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||||
|
|
||||||
|
@ -230,4 +233,24 @@ public class TestProtoBufRpc {
|
||||||
re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION));
|
re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test(timeout=6000)
|
||||||
|
public void testExtraLongRpc() throws Exception {
|
||||||
|
TestRpcService2 client = getClient2();
|
||||||
|
final String shortString = StringUtils.repeat("X", 4);
|
||||||
|
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
||||||
|
.setMessage(shortString).build();
|
||||||
|
// short message goes through
|
||||||
|
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
|
||||||
|
|
||||||
|
final String longString = StringUtils.repeat("X", 4096);
|
||||||
|
echoRequest = EchoRequestProto.newBuilder()
|
||||||
|
.setMessage(longString).build();
|
||||||
|
try {
|
||||||
|
echoResponse = client.echo2(null, echoRequest);
|
||||||
|
Assert.fail("expected extra-long RPC to fail");
|
||||||
|
} catch (ServiceException se) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue