HBASE-22905 Avoid temp ByteBuffer allocation in (#538)

BlockingRpcConnection#writeRequest
This commit is contained in:
chenxu14 2019-09-02 13:01:36 +08:00 committed by stack
parent 0e87e86a29
commit 647613b308
2 changed files with 43 additions and 36 deletions

View File

@ -35,7 +35,6 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
@ -70,6 +69,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@ -599,37 +600,44 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
* @see #readResponse()
*/
private void writeRequest(Call call) throws IOException {
ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
this.compressor, call.cells);
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
setupIOstreams();
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in the
// pending calls map.
ByteBuf cellBlock = null;
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
call.cells, PooledByteBufAllocator.DEFAULT);
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
setupIOstreams();
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in
// the pending calls map.
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
} finally {
if (cellBlock != null) {
cellBlock.release();
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
notifyAll();
}

View File

@ -23,7 +23,6 @@ import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -41,7 +40,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@ -62,19 +61,19 @@ class IPCUtil {
* @throws IOException if write action fails
*/
public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock) throws IOException {
final ByteBuf cellBlock) throws IOException {
// Must calculate total size and write that first so other side can read it all in in one
// swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) {
totalSize += cellBlock.remaining();
totalSize += cellBlock.readableBytes();
}
return write(dos, header, param, cellBlock, totalSize);
}
private static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock, final int totalSize) throws IOException {
final ByteBuf cellBlock, final int totalSize) throws IOException {
// I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally.
@ -83,7 +82,7 @@ class IPCUtil {
param.writeDelimitedTo(dos);
}
if (cellBlock != null) {
dos.write(cellBlock.array(), 0, cellBlock.remaining());
cellBlock.readBytes(dos, cellBlock.readableBytes());
}
dos.flush();
return totalSize;