diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index 649375a89c1..2a2df8a7ad4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -17,35 +17,35 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; - import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.hbase.CellScanner; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.ipc.RemoteException; /** * The netty rpc handler. @@ -103,8 +103,8 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { ctx.write(buf, withoutCellBlockPromise); ChannelPromise cellBlockPromise = ctx.newPromise(); ctx.write(cellBlock, cellBlockPromise); - PromiseCombiner combiner = new PromiseCombiner(); - combiner.addAll(withoutCellBlockPromise, cellBlockPromise); + PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); + combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise); combiner.finish(promise); } else { ctx.write(buf, promise); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index 5ed3d2ef43f..9444cd0dee9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.util.List; - import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; @@ -30,6 +29,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; @@ -124,10 +124,8 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder { RPCProtos.RequestHeader header = getHeader(in, headerSize); // Notify the client about the offending request - NettyServerCall reqTooBig = - new NettyServerCall(header.getCallId(), connection.service, null, null, null, null, - connection, 0, connection.addr, System.currentTimeMillis(), 0, - connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null); + NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null, + null, null, null, 0, connection.addr, 0, null); connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index d20e28f8c78..a5c8a3920b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -26,25 +26,27 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.io.ByteBuffAllocator; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; /** * Datastructure that holds all necessary to a method invocation and then afterward, carries @@ -217,10 +219,14 @@ public abstract class ServerCall implements RpcCa } @Override - public synchronized void setResponse(Message m, final CellScanner cells, - Throwable t, String errorMsg) { - if (this.isError) return; - if (t != null) this.isError = true; + public synchronized void setResponse(Message m, final CellScanner cells, Throwable t, + String errorMsg) { + if (this.isError) { + return; + } + if (t != null) { + this.isError = true; + } BufferChain bc = null; try { ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); @@ -385,9 +391,10 @@ public abstract class ServerCall implements RpcCa return pbBuf; } - protected BufferChain wrapWithSasl(BufferChain bc) - throws IOException { - if (!this.connection.useSasl) return bc; + protected BufferChain wrapWithSasl(BufferChain bc) throws IOException { + if (!this.connection.useSasl) { + return bc; + } // Looks like no way around this; saslserver wants a byte array. I have to make it one. // THIS IS A BIG UGLY COPY. byte [] responseBytes = bc.getBytes(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index f3f78073dc5..cbcbc9a8f7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -40,24 +40,23 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.security.HBasePolicyProvider; -import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; -import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; /** * The RPC server with native java NIO implementation deriving from Hadoop to @@ -307,7 +306,7 @@ public class SimpleRpcServer extends RpcServer { // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { - IOUtils.cleanup(null, channel); + IOUtils.cleanupWithLogger(LOG, channel); } continue; } @@ -416,10 +415,12 @@ public class SimpleRpcServer extends RpcServer { @Override public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } - /** Starts the service. Must be called before any calls will be handled. */ + /** Starts the service. Must be called before any calls will be handled. */ @Override public synchronized void start() { - if (started) return; + if (started) { + return; + } authTokenSecretMgr = createSecretManager(); if (authTokenSecretMgr != null) { setSecretManager(authTokenSecretMgr); @@ -433,7 +434,7 @@ public class SimpleRpcServer extends RpcServer { started = true; } - /** Stops the service. No new calls will be handled after this is called. */ + /** Stops the service. No new calls will be handled after this is called. */ @Override public synchronized void stop() { LOG.info("Stopping server on " + port); @@ -449,10 +450,9 @@ public class SimpleRpcServer extends RpcServer { notifyAll(); } - /** Wait for the server to be stopped. - * Does not wait for all subthreads to finish. - * See {@link #stop()}. - * @throws InterruptedException e + /** + * Wait for the server to be stopped. Does not wait for all subthreads to finish. + * @see #stop() */ @Override public synchronized void join() throws InterruptedException { @@ -503,13 +503,14 @@ public class SimpleRpcServer extends RpcServer { * @param channel writable byte channel to write to * @param bufferChain Chain of buffers to write * @return number of bytes written - * @throws java.io.IOException e * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) */ protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) - throws IOException { - long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); - if (count > 0) this.metrics.sentBytes(count); + throws IOException { + long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); + if (count > 0) { + this.metrics.sentBytes(count); + } return count; } @@ -523,22 +524,20 @@ public class SimpleRpcServer extends RpcServer { * @throws UnknownHostException if the address isn't a valid host name * @throws IOException other random errors from bind */ - public static void bind(ServerSocket socket, InetSocketAddress address, - int backlog) throws IOException { + public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) + throws IOException { try { socket.bind(address, backlog); } catch (BindException e) { BindException bindException = - new BindException("Problem binding to " + address + " : " + - e.getMessage()); + new BindException("Problem binding to " + address + " : " + e.getMessage()); bindException.initCause(e); throw bindException; } catch (SocketException e) { // If they try to bind to a different host's address, give a better // error message. if ("Unresolved address".equals(e.getMessage())) { - throw new UnknownHostException("Invalid hostname for server: " + - address.getHostName()); + throw new UnknownHostException("Invalid hostname for server: " + address.getHostName()); } throw e; }