HBASE-15202 Reduce garbage while setting response (Ram)
This commit is contained in:
parent
1b420be56a
commit
38e79b09c9
|
@ -136,6 +136,7 @@ import org.apache.htrace.TraceInfo;
|
|||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.CodedInputStream;
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -431,14 +432,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
Message header = headerBuilder.build();
|
||||
|
||||
// Organize the response as a set of bytebuffers rather than collect it all together inside
|
||||
// one big byte array; save on allocations.
|
||||
ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
|
||||
ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
|
||||
int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
|
||||
(this.cellBlock == null? 0: this.cellBlock.limit());
|
||||
ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
|
||||
bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
|
||||
byte[] b = createHeaderAndMessageBytes(result, header);
|
||||
|
||||
bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
|
||||
|
||||
if (connection.useWrap) {
|
||||
bc = wrapWithSasl(bc);
|
||||
}
|
||||
|
@ -448,6 +445,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
this.response = bc;
|
||||
}
|
||||
|
||||
private byte[] createHeaderAndMessageBytes(Message result, Message header)
|
||||
throws IOException {
|
||||
// Organize the response as a set of bytebuffers rather than collect it all together inside
|
||||
// one big byte array; save on allocations.
|
||||
int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
|
||||
resultVintSize = 0;
|
||||
if (header != null) {
|
||||
headerSerializedSize = header.getSerializedSize();
|
||||
headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
|
||||
}
|
||||
if (result != null) {
|
||||
resultSerializedSize = result.getSerializedSize();
|
||||
resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
|
||||
}
|
||||
// calculate the total size
|
||||
int totalSize = headerSerializedSize + headerVintSize
|
||||
+ (resultSerializedSize + resultVintSize)
|
||||
+ (this.cellBlock == null ? 0 : this.cellBlock.limit());
|
||||
// The byte[] should also hold the totalSize of the header, message and the cellblock
|
||||
byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
|
||||
+ resultVintSize + Bytes.SIZEOF_INT];
|
||||
// The RpcClient expects the int to be in a format that code be decoded by
|
||||
// the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
|
||||
// form of writing int.
|
||||
Bytes.putInt(b, 0, totalSize);
|
||||
CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
|
||||
b.length - Bytes.SIZEOF_INT);
|
||||
if (header != null) {
|
||||
cos.writeMessageNoTag(header);
|
||||
}
|
||||
if (result != null) {
|
||||
cos.writeMessageNoTag(result);
|
||||
}
|
||||
cos.flush();
|
||||
cos.checkNoSpaceLeft();
|
||||
return b;
|
||||
}
|
||||
|
||||
private BufferChain wrapWithSasl(BufferChain bc)
|
||||
throws IOException {
|
||||
if (!this.connection.useSasl) return bc;
|
||||
|
|
Loading…
Reference in New Issue