HBASE-22905 Avoid temp ByteBuffer allocation in (#538)

BlockingRpcConnection#writeRequest
This commit is contained in:
chenxu14 2019-09-02 13:01:36 +08:00 committed by Michael Stack
parent 693fa05273
commit d32bf8daed
2 changed files with 43 additions and 36 deletions

View File

@ -35,7 +35,6 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Locale; import java.util.Locale;
@ -70,6 +69,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@ -599,37 +600,44 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
* @see #readResponse() * @see #readResponse()
*/ */
private void writeRequest(Call call) throws IOException { private void writeRequest(Call call) throws IOException {
ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, ByteBuf cellBlock = null;
this.compressor, call.cells);
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
setupIOstreams();
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in the
// pending calls map.
try { try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
} catch (Throwable t) { call.cells, PooledByteBufAllocator.DEFAULT);
if(LOG.isTraceEnabled()) { CellBlockMeta cellBlockMeta;
LOG.trace("Error while writing call, call_id:" + call.id, t); if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
setupIOstreams();
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in
// the pending calls map.
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
} finally {
if (cellBlock != null) {
cellBlock.release();
} }
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
} }
notifyAll(); notifyAll();
} }

View File

@ -23,7 +23,6 @@ import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -41,7 +40,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@ -62,19 +61,19 @@ class IPCUtil {
* @throws IOException if write action fails * @throws IOException if write action fails
*/ */
public static int write(final OutputStream dos, final Message header, final Message param, public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock) throws IOException { final ByteBuf cellBlock) throws IOException {
// Must calculate total size and write that first so other side can read it all in in one // Must calculate total size and write that first so other side can read it all in in one
// swoop. This is dictated by how the server is currently written. Server needs to change // swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing. // if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) { if (cellBlock != null) {
totalSize += cellBlock.remaining(); totalSize += cellBlock.readableBytes();
} }
return write(dos, header, param, cellBlock, totalSize); return write(dos, header, param, cellBlock, totalSize);
} }
private static int write(final OutputStream dos, final Message header, final Message param, private static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock, final int totalSize) throws IOException { final ByteBuf cellBlock, final int totalSize) throws IOException {
// I confirmed toBytes does same as DataOutputStream#writeInt. // I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize)); dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally. // This allocates a buffer that is the size of the message internally.
@ -83,7 +82,7 @@ class IPCUtil {
param.writeDelimitedTo(dos); param.writeDelimitedTo(dos);
} }
if (cellBlock != null) { if (cellBlock != null) {
dos.write(cellBlock.array(), 0, cellBlock.remaining()); cellBlock.readBytes(dos, cellBlock.readableBytes());
} }
dos.flush(); dos.flush();
return totalSize; return totalSize;