HBASE-15202 Reduce garbage while setting response (Ram)

This commit is contained in:
ramkrishna 2016-02-04 23:23:31 +05:30
parent f5fba2ba0d
commit 7b33a740b1
1 changed files with 43 additions and 8 deletions

View File

@ -131,6 +131,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -427,14 +428,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
Message header = headerBuilder.build(); Message header = headerBuilder.build();
// Organize the response as a set of bytebuffers rather than collect it all together inside byte[] b = createHeaderAndMessageBytes(result, header);
// one big byte array; save on allocations.
ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
(this.cellBlock == null? 0: this.cellBlock.limit());
ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
if (connection.useWrap) { if (connection.useWrap) {
bc = wrapWithSasl(bc); bc = wrapWithSasl(bc);
} }
@ -454,6 +451,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
} }
private byte[] createHeaderAndMessageBytes(Message result, Message header)
throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
resultVintSize = 0;
if (header != null) {
headerSerializedSize = header.getSerializedSize();
headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
}
if (result != null) {
resultSerializedSize = result.getSerializedSize();
resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
}
// calculate the total size
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
+ (this.cellBlock == null ? 0 : this.cellBlock.limit());
// The byte[] should also hold the totalSize of the header, message and the cellblock
byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT];
// The RpcClient expects the int to be in a format that code be decoded by
// the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
// form of writing int.
Bytes.putInt(b, 0, totalSize);
CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
b.length - Bytes.SIZEOF_INT);
if (header != null) {
cos.writeMessageNoTag(header);
}
if (result != null) {
cos.writeMessageNoTag(result);
}
cos.flush();
cos.checkNoSpaceLeft();
return b;
}
private BufferChain wrapWithSasl(BufferChain bc) private BufferChain wrapWithSasl(BufferChain bc)
throws IOException { throws IOException {
if (!this.connection.useSasl) return bc; if (!this.connection.useSasl) return bc;