HBASE-25420 Some minor improvements in rpc implementation (#2792)

Signed-off-by: XinSun <ddupgs@gmail.com>
Signed-off-by: stack <stack@apache.com>
This commit is contained in:
Duo Zhang 2020-12-20 11:26:36 +08:00 committed by GitHub
parent 7b1e9cd0d2
commit 1540b89cee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 54 deletions

View File

@ -17,35 +17,35 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
/** /**
* The netty rpc handler. * The netty rpc handler.
@ -103,8 +103,8 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
ctx.write(buf, withoutCellBlockPromise); ctx.write(buf, withoutCellBlockPromise);
ChannelPromise cellBlockPromise = ctx.newPromise(); ChannelPromise cellBlockPromise = ctx.newPromise();
ctx.write(cellBlock, cellBlockPromise); ctx.write(cellBlock, cellBlockPromise);
PromiseCombiner combiner = new PromiseCombiner(); PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
combiner.addAll(withoutCellBlockPromise, cellBlockPromise); combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise);
combiner.finish(promise); combiner.finish(promise);
} else { } else {
ctx.write(buf, promise); ctx.write(buf, promise);

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
@ -30,6 +29,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException; import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@ -124,10 +124,8 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
RPCProtos.RequestHeader header = getHeader(in, headerSize); RPCProtos.RequestHeader header = getHeader(in, headerSize);
// Notify the client about the offending request // Notify the client about the offending request
NettyServerCall reqTooBig = NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null,
new NettyServerCall(header.getCallId(), connection.service, null, null, null, null, null, null, null, 0, connection.addr, 0, null);
connection, 0, connection.addr, System.currentTimeMillis(), 0,
connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);

View File

@ -26,25 +26,27 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
/** /**
* Datastructure that holds all necessary to a method invocation and then afterward, carries * Datastructure that holds all necessary to a method invocation and then afterward, carries
@ -217,10 +219,14 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
} }
@Override @Override
public synchronized void setResponse(Message m, final CellScanner cells, public synchronized void setResponse(Message m, final CellScanner cells, Throwable t,
Throwable t, String errorMsg) { String errorMsg) {
if (this.isError) return; if (this.isError) {
if (t != null) this.isError = true; return;
}
if (t != null) {
this.isError = true;
}
BufferChain bc = null; BufferChain bc = null;
try { try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
@ -385,9 +391,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
return pbBuf; return pbBuf;
} }
protected BufferChain wrapWithSasl(BufferChain bc) protected BufferChain wrapWithSasl(BufferChain bc) throws IOException {
throws IOException { if (!this.connection.useSasl) {
if (!this.connection.useSasl) return bc; return bc;
}
// Looks like no way around this; saslserver wants a byte array. I have to make it one. // Looks like no way around this; saslserver wants a byte array. I have to make it one.
// THIS IS A BIG UGLY COPY. // THIS IS A BIG UGLY COPY.
byte [] responseBytes = bc.getBytes(); byte [] responseBytes = bc.getBytes();

View File

@ -40,24 +40,23 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
/** /**
* The RPC server with native java NIO implementation deriving from Hadoop to * The RPC server with native java NIO implementation deriving from Hadoop to
@ -307,7 +306,7 @@ public class SimpleRpcServer extends RpcServer {
// If the connectionManager can't take it, close the connection. // If the connectionManager can't take it, close the connection.
if (c == null) { if (c == null) {
if (channel.isOpen()) { if (channel.isOpen()) {
IOUtils.cleanup(null, channel); IOUtils.cleanupWithLogger(LOG, channel);
} }
continue; continue;
} }
@ -416,10 +415,12 @@ public class SimpleRpcServer extends RpcServer {
@Override @Override
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
/** Starts the service. Must be called before any calls will be handled. */ /** Starts the service. Must be called before any calls will be handled. */
@Override @Override
public synchronized void start() { public synchronized void start() {
if (started) return; if (started) {
return;
}
authTokenSecretMgr = createSecretManager(); authTokenSecretMgr = createSecretManager();
if (authTokenSecretMgr != null) { if (authTokenSecretMgr != null) {
setSecretManager(authTokenSecretMgr); setSecretManager(authTokenSecretMgr);
@ -433,7 +434,7 @@ public class SimpleRpcServer extends RpcServer {
started = true; started = true;
} }
/** Stops the service. No new calls will be handled after this is called. */ /** Stops the service. No new calls will be handled after this is called. */
@Override @Override
public synchronized void stop() { public synchronized void stop() {
LOG.info("Stopping server on " + port); LOG.info("Stopping server on " + port);
@ -449,10 +450,9 @@ public class SimpleRpcServer extends RpcServer {
notifyAll(); notifyAll();
} }
/** Wait for the server to be stopped. /**
* Does not wait for all subthreads to finish. * Wait for the server to be stopped. Does not wait for all subthreads to finish.
* See {@link #stop()}. * @see #stop()
* @throws InterruptedException e
*/ */
@Override @Override
public synchronized void join() throws InterruptedException { public synchronized void join() throws InterruptedException {
@ -503,13 +503,14 @@ public class SimpleRpcServer extends RpcServer {
* @param channel writable byte channel to write to * @param channel writable byte channel to write to
* @param bufferChain Chain of buffers to write * @param bufferChain Chain of buffers to write
* @return number of bytes written * @return number of bytes written
* @throws java.io.IOException e
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
*/ */
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
throws IOException { throws IOException {
long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
if (count > 0) this.metrics.sentBytes(count); if (count > 0) {
this.metrics.sentBytes(count);
}
return count; return count;
} }
@ -523,22 +524,20 @@ public class SimpleRpcServer extends RpcServer {
* @throws UnknownHostException if the address isn't a valid host name * @throws UnknownHostException if the address isn't a valid host name
* @throws IOException other random errors from bind * @throws IOException other random errors from bind
*/ */
public static void bind(ServerSocket socket, InetSocketAddress address, public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
int backlog) throws IOException { throws IOException {
try { try {
socket.bind(address, backlog); socket.bind(address, backlog);
} catch (BindException e) { } catch (BindException e) {
BindException bindException = BindException bindException =
new BindException("Problem binding to " + address + " : " + new BindException("Problem binding to " + address + " : " + e.getMessage());
e.getMessage());
bindException.initCause(e); bindException.initCause(e);
throw bindException; throw bindException;
} catch (SocketException e) { } catch (SocketException e) {
// If they try to bind to a different host's address, give a better // If they try to bind to a different host's address, give a better
// error message. // error message.
if ("Unresolved address".equals(e.getMessage())) { if ("Unresolved address".equals(e.getMessage())) {
throw new UnknownHostException("Invalid hostname for server: " + throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
address.getHostName());
} }
throw e; throw e;
} }