HBASE-25420 Some minor improvements in rpc implementation (#2792)
Signed-off-by: XinSun <ddupgs@gmail.com> Signed-off-by: stack <stack@apache.com>
This commit is contained in:
parent
914b356d15
commit
1385fb3560
|
@ -17,35 +17,35 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
/**
|
||||
* The netty rpc handler.
|
||||
|
@ -103,8 +103,8 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
|
|||
ctx.write(buf, withoutCellBlockPromise);
|
||||
ChannelPromise cellBlockPromise = ctx.newPromise();
|
||||
ctx.write(cellBlock, cellBlockPromise);
|
||||
PromiseCombiner combiner = new PromiseCombiner();
|
||||
combiner.addAll(withoutCellBlockPromise, cellBlockPromise);
|
||||
PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
|
||||
combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise);
|
||||
combiner.finish(promise);
|
||||
} else {
|
||||
ctx.write(buf, promise);
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
|
||||
|
@ -30,6 +29,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
|
|||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
|
||||
|
||||
|
@ -124,10 +124,8 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
|||
RPCProtos.RequestHeader header = getHeader(in, headerSize);
|
||||
|
||||
// Notify the client about the offending request
|
||||
NettyServerCall reqTooBig =
|
||||
new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
|
||||
connection, 0, connection.addr, System.currentTimeMillis(), 0,
|
||||
connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
|
||||
NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null,
|
||||
null, null, null, 0, connection.addr, 0, null);
|
||||
|
||||
connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
|
||||
|
||||
|
|
|
@ -26,25 +26,27 @@ import java.util.Optional;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Datastructure that holds all necessary to a method invocation and then afterward, carries
|
||||
|
@ -217,10 +219,14 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setResponse(Message m, final CellScanner cells,
|
||||
Throwable t, String errorMsg) {
|
||||
if (this.isError) return;
|
||||
if (t != null) this.isError = true;
|
||||
public synchronized void setResponse(Message m, final CellScanner cells, Throwable t,
|
||||
String errorMsg) {
|
||||
if (this.isError) {
|
||||
return;
|
||||
}
|
||||
if (t != null) {
|
||||
this.isError = true;
|
||||
}
|
||||
BufferChain bc = null;
|
||||
try {
|
||||
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
|
||||
|
@ -385,9 +391,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
|||
return pbBuf;
|
||||
}
|
||||
|
||||
protected BufferChain wrapWithSasl(BufferChain bc)
|
||||
throws IOException {
|
||||
if (!this.connection.useSasl) return bc;
|
||||
protected BufferChain wrapWithSasl(BufferChain bc) throws IOException {
|
||||
if (!this.connection.useSasl) {
|
||||
return bc;
|
||||
}
|
||||
// Looks like no way around this; saslserver wants a byte array. I have to make it one.
|
||||
// THIS IS A BIG UGLY COPY.
|
||||
byte [] responseBytes = bc.getBytes();
|
||||
|
|
|
@ -40,24 +40,23 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
||||
/**
|
||||
* The RPC server with native java NIO implementation deriving from Hadoop to
|
||||
|
@ -307,7 +306,7 @@ public class SimpleRpcServer extends RpcServer {
|
|||
// If the connectionManager can't take it, close the connection.
|
||||
if (c == null) {
|
||||
if (channel.isOpen()) {
|
||||
IOUtils.cleanup(null, channel);
|
||||
IOUtils.cleanupWithLogger(LOG, channel);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
@ -416,10 +415,12 @@ public class SimpleRpcServer extends RpcServer {
|
|||
@Override
|
||||
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
|
||||
|
||||
/** Starts the service. Must be called before any calls will be handled. */
|
||||
/** Starts the service. Must be called before any calls will be handled. */
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (started) return;
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
authTokenSecretMgr = createSecretManager();
|
||||
if (authTokenSecretMgr != null) {
|
||||
setSecretManager(authTokenSecretMgr);
|
||||
|
@ -433,7 +434,7 @@ public class SimpleRpcServer extends RpcServer {
|
|||
started = true;
|
||||
}
|
||||
|
||||
/** Stops the service. No new calls will be handled after this is called. */
|
||||
/** Stops the service. No new calls will be handled after this is called. */
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
LOG.info("Stopping server on " + port);
|
||||
|
@ -449,10 +450,9 @@ public class SimpleRpcServer extends RpcServer {
|
|||
notifyAll();
|
||||
}
|
||||
|
||||
/** Wait for the server to be stopped.
|
||||
* Does not wait for all subthreads to finish.
|
||||
* See {@link #stop()}.
|
||||
* @throws InterruptedException e
|
||||
/**
|
||||
* Wait for the server to be stopped. Does not wait for all subthreads to finish.
|
||||
* @see #stop()
|
||||
*/
|
||||
@Override
|
||||
public synchronized void join() throws InterruptedException {
|
||||
|
@ -503,13 +503,14 @@ public class SimpleRpcServer extends RpcServer {
|
|||
* @param channel writable byte channel to write to
|
||||
* @param bufferChain Chain of buffers to write
|
||||
* @return number of bytes written
|
||||
* @throws java.io.IOException e
|
||||
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
|
||||
*/
|
||||
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
|
||||
throws IOException {
|
||||
long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
|
||||
if (count > 0) this.metrics.sentBytes(count);
|
||||
throws IOException {
|
||||
long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
|
||||
if (count > 0) {
|
||||
this.metrics.sentBytes(count);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
|
@ -523,22 +524,20 @@ public class SimpleRpcServer extends RpcServer {
|
|||
* @throws UnknownHostException if the address isn't a valid host name
|
||||
* @throws IOException other random errors from bind
|
||||
*/
|
||||
public static void bind(ServerSocket socket, InetSocketAddress address,
|
||||
int backlog) throws IOException {
|
||||
public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
|
||||
throws IOException {
|
||||
try {
|
||||
socket.bind(address, backlog);
|
||||
} catch (BindException e) {
|
||||
BindException bindException =
|
||||
new BindException("Problem binding to " + address + " : " +
|
||||
e.getMessage());
|
||||
new BindException("Problem binding to " + address + " : " + e.getMessage());
|
||||
bindException.initCause(e);
|
||||
throw bindException;
|
||||
} catch (SocketException e) {
|
||||
// If they try to bind to a different host's address, give a better
|
||||
// error message.
|
||||
if ("Unresolved address".equals(e.getMessage())) {
|
||||
throw new UnknownHostException("Invalid hostname for server: " +
|
||||
address.getHostName());
|
||||
throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue