diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e56337f83aa..e9f417336c9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -171,6 +171,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9619 Mark stability of .proto files (sanjay Radia) + HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick + McCabe) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index c5d86f140a5..68632503e96 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -64,6 +64,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { "ipc.server.read.threadpool.size"; /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */ public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; + + public static final String IPC_MAXIMUM_DATA_LENGTH = + "ipc.maximum.data.length"; + + public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024; /** How many calls per handler are allowed in the queue. */ public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d701b312197..5d3946e8c58 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -340,6 +340,7 @@ public abstract class Server { private int maxQueueSize; private final int maxRespSize; private int socketSendBufferSize; + private final int maxDataLength; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm volatile private boolean running = true; // true while server runs @@ -1377,7 +1378,22 @@ public abstract class Server { } } } - + + private void checkDataLength(int dataLength) throws IOException { + if (dataLength < 0) { + String error = "Unexpected data length " + dataLength + + "!! from " + getHostAddress(); + LOG.warn(error); + throw new IOException(error); + } else if (dataLength > maxDataLength) { + String error = "Requested data length " + dataLength + + " is longer than maximum configured RPC length " + + maxDataLength + ". RPC came from " + getHostAddress(); + LOG.warn(error); + throw new IOException(error); + } + } + public int readAndProcess() throws IOException, InterruptedException { while (true) { /* Read at most one RPC. If the header is not read completely yet @@ -1439,11 +1455,7 @@ public abstract class Server { dataLengthBuffer.clear(); return 0; // ping message } - - if (dataLength < 0) { - LOG.warn("Unexpected data length " + dataLength + "!! from " + - getHostAddress()); - } + checkDataLength(dataLength); data = ByteBuffer.allocate(dataLength); } @@ -1978,6 +1990,8 @@ public abstract class Server { this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; + this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { this.maxQueueSize = queueSizePerHandler; } else { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index e7200860dc2..2ec56eb5ea9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -24,7 +24,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; @@ -113,6 +115,7 @@ public class TestProtoBufRpc { @Before public void setUp() throws IOException { // Setup server for both protocols conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024); // Set RPC engine to protobuf RPC engine RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); @@ -230,4 +233,24 @@ public class TestProtoBufRpc { re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION)); } } -} \ No newline at end of file + + @Test(timeout=6000) + public void testExtraLongRpc() throws Exception { + TestRpcService2 client = getClient2(); + final String shortString = StringUtils.repeat("X", 4); + EchoRequestProto echoRequest = EchoRequestProto.newBuilder() + .setMessage(shortString).build(); + // short message goes through + EchoResponseProto echoResponse = client.echo2(null, echoRequest); + + final String longString = StringUtils.repeat("X", 4096); + echoRequest = EchoRequestProto.newBuilder() + .setMessage(longString).build(); + try { + echoResponse = client.echo2(null, echoRequest); + Assert.fail("expected extra-long RPC to fail"); + } catch (ServiceException se) { + // expected + } + } +}