HADOOP-13549. Eliminate intermediate buffer for server-side PB encoding. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2016-09-06 10:01:01 -05:00
parent e6fcfe28e3
commit 39d1b1d747
1 changed files with 49 additions and 3 deletions

View File

@ -119,6 +119,7 @@ import org.apache.htrace.core.Tracer;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message; import com.google.protobuf.Message;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a /** An abstract IPC service. IPC calls take a single {@link Writable} as a
@ -2752,24 +2753,69 @@ public abstract class Server {
private void setupResponse(RpcCall call, private void setupResponse(RpcCall call,
RpcResponseHeaderProto header, Writable rv) throws IOException { RpcResponseHeaderProto header, Writable rv) throws IOException {
final byte[] response;
if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
response = setupResponseForProtobuf(header, rv);
} else {
response = setupResponseForWritable(header, rv);
}
if (response.length > maxRespSize) {
LOG.warn("Large response size " + response.length + " for call "
+ call.toString());
}
call.setResponse(ByteBuffer.wrap(response));
}
private byte[] setupResponseForWritable(
RpcResponseHeaderProto header, Writable rv) throws IOException {
ResponseBuffer buf = responseBuffer.get().reset(); ResponseBuffer buf = responseBuffer.get().reset();
try { try {
RpcWritable.wrap(header).writeTo(buf); RpcWritable.wrap(header).writeTo(buf);
if (rv != null) { if (rv != null) {
RpcWritable.wrap(rv).writeTo(buf); RpcWritable.wrap(rv).writeTo(buf);
} }
call.setResponse(ByteBuffer.wrap(buf.toByteArray())); return buf.toByteArray();
} finally { } finally {
// Discard a large buf and reset it back to smaller size // Discard a large buf and reset it back to smaller size
// to free up heap. // to free up heap.
if (buf.capacity() > maxRespSize) { if (buf.capacity() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "
+ call.toString());
buf.setCapacity(INITIAL_RESP_BUF_SIZE); buf.setCapacity(INITIAL_RESP_BUF_SIZE);
} }
} }
} }
// writing to a pre-allocated array is the most efficient way to construct
// a protobuf response.
private byte[] setupResponseForProtobuf(
RpcResponseHeaderProto header, Writable rv) throws IOException {
Message payload = (rv != null)
? ((RpcWritable.ProtobufWrapper)rv).getMessage() : null;
int length = getDelimitedLength(header);
if (payload != null) {
length += getDelimitedLength(payload);
}
byte[] buf = new byte[length + 4];
CodedOutputStream cos = CodedOutputStream.newInstance(buf);
// the stream only supports little endian ints
cos.writeRawByte((byte)((length >>> 24) & 0xFF));
cos.writeRawByte((byte)((length >>> 16) & 0xFF));
cos.writeRawByte((byte)((length >>> 8) & 0xFF));
cos.writeRawByte((byte)((length >>> 0) & 0xFF));
cos.writeRawVarint32(header.getSerializedSize());
header.writeTo(cos);
if (payload != null) {
cos.writeRawVarint32(payload.getSerializedSize());
payload.writeTo(cos);
}
return buf;
}
private static int getDelimitedLength(Message message) {
int length = message.getSerializedSize();
return length + CodedOutputStream.computeRawVarint32Size(length);
}
/** /**
* Setup response for the IPC Call on Fatal Error from a * Setup response for the IPC Call on Fatal Error from a
* client that is using old version of Hadoop. * client that is using old version of Hadoop.