diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 54c7ac42fc0..4600e095dbc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -118,6 +118,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; /** An abstract IPC service. IPC calls take a single {@link Writable} as a @@ -2668,24 +2669,69 @@ private void setupResponse( private void setupResponse(RpcCall call, RpcResponseHeaderProto header, Writable rv) throws IOException { + final byte[] response; + if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) { + response = setupResponseForProtobuf(header, rv); + } else { + response = setupResponseForWritable(header, rv); + } + if (response.length > maxRespSize) { + LOG.warn("Large response size " + response.length + " for call " + + call.toString()); + } + call.setResponse(ByteBuffer.wrap(response)); + } + + private byte[] setupResponseForWritable( + RpcResponseHeaderProto header, Writable rv) throws IOException { ResponseBuffer buf = responseBuffer.get().reset(); try { RpcWritable.wrap(header).writeTo(buf); if (rv != null) { RpcWritable.wrap(rv).writeTo(buf); } - call.setResponse(ByteBuffer.wrap(buf.toByteArray())); + return buf.toByteArray(); } finally { // Discard a large buf and reset it back to smaller size // to free up heap. if (buf.capacity() > maxRespSize) { - LOG.warn("Large response size " + buf.size() + " for call " - + call.toString()); buf.setCapacity(INITIAL_RESP_BUF_SIZE); } } } + + // writing to a pre-allocated array is the most efficient way to construct + // a protobuf response. + private byte[] setupResponseForProtobuf( + RpcResponseHeaderProto header, Writable rv) throws IOException { + Message payload = (rv != null) + ? ((RpcWritable.ProtobufWrapper)rv).getMessage() : null; + int length = getDelimitedLength(header); + if (payload != null) { + length += getDelimitedLength(payload); + } + byte[] buf = new byte[length + 4]; + CodedOutputStream cos = CodedOutputStream.newInstance(buf); + // the stream only supports little endian ints + cos.writeRawByte((byte)((length >>> 24) & 0xFF)); + cos.writeRawByte((byte)((length >>> 16) & 0xFF)); + cos.writeRawByte((byte)((length >>> 8) & 0xFF)); + cos.writeRawByte((byte)((length >>> 0) & 0xFF)); + cos.writeRawVarint32(header.getSerializedSize()); + header.writeTo(cos); + if (payload != null) { + cos.writeRawVarint32(payload.getSerializedSize()); + payload.writeTo(cos); + } + return buf; + } + + private static int getDelimitedLength(Message message) { + int length = message.getSerializedSize(); + return length + CodedOutputStream.computeRawVarint32Size(length); + } + /** * Setup response for the IPC Call on Fatal Error from a * client that is using old version of Hadoop.