diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index bd0515a6be8..5d108e028e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -34,7 +34,7 @@ class BufferChain { private int bufferOffset = 0; private int size; - BufferChain(ByteBuffer[] buffers) { + BufferChain(ByteBuffer... buffers) { for (ByteBuffer b : buffers) { this.remaining += b.remaining(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 4a4ddbac96d..4b06fabb39f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -18,28 +18,19 @@ package org.apache.hadoop.hbase.ipc; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.concurrent.GlobalEventExecutor; @@ -72,11 +63,11 @@ public class NettyRpcServer extends RpcServer { public static final Log LOG = LogFactory.getLog(NettyRpcServer.class); - protected final InetSocketAddress bindAddress; + private final InetSocketAddress bindAddress; private final CountDownLatch closed = new CountDownLatch(1); private final Channel serverChannel; - private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);; + private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public NettyRpcServer(final Server server, final String name, final List services, @@ -107,7 +98,21 @@ public class NettyRpcServer extends RpcServer { bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); - bootstrap.childHandler(new Initializer(maxRequestSize)); + bootstrap.childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); + preambleDecoder.setSingleDecode(true); + pipeline.addLast("preambleDecoder", preambleDecoder); + pipeline.addLast("preambleHandler", new NettyRpcServerPreambleHandler(NettyRpcServer.this)); + pipeline.addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true)); + pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); + pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); + } + }); try { serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); @@ -173,125 +178,6 @@ public class NettyRpcServer extends RpcServer { return ((InetSocketAddress) serverChannel.localAddress()); } - private class Initializer extends ChannelInitializer { - - final int maxRequestSize; - - Initializer(int maxRequestSize) { - this.maxRequestSize = maxRequestSize; - } - - @Override - protected void initChannel(SocketChannel channel) throws Exception { - ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast("header", new ConnectionHeaderHandler()); - pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder( - maxRequestSize, 0, 4, 0, 4, true)); - pipeline.addLast("decoder", new MessageDecoder()); - pipeline.addLast("encoder", new MessageEncoder()); - } - - } - - private class ConnectionHeaderHandler extends ByteToMessageDecoder { - private NettyServerRpcConnection connection; - - ConnectionHeaderHandler() { - } - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, - List out) throws Exception { - if (byteBuf.readableBytes() < 6) { - return; - } - connection = new NettyServerRpcConnection(NettyRpcServer.this, ctx.channel()); - connection.readPreamble(byteBuf); - ((MessageDecoder) ctx.pipeline().get("decoder")) - .setConnection(connection); - ctx.pipeline().remove(this); - } - - } - - private class MessageDecoder extends ChannelInboundHandlerAdapter { - - private NettyServerRpcConnection connection; - - void setConnection(NettyServerRpcConnection connection) { - this.connection = connection; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - allChannels.add(ctx.channel()); - if (LOG.isDebugEnabled()) { - LOG.debug("Connection from " + ctx.channel().remoteAddress() - + "; # active connections: " + getNumOpenConnections()); - } - super.channelActive(ctx); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - ByteBuf input = (ByteBuf) msg; - // 4 bytes length field - metrics.receivedBytes(input.readableBytes() + 4); - connection.process(input); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - allChannels.remove(ctx.channel()); - if (LOG.isDebugEnabled()) { - LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress() - + ". Number of active connections: " + getNumOpenConnections()); - } - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { - allChannels.remove(ctx.channel()); - if (LOG.isDebugEnabled()) { - LOG.debug("Connection from " + ctx.channel().remoteAddress() - + " catch unexpected exception from downstream.", e.getCause()); - } - ctx.channel().close(); - } - - } - - private class MessageEncoder extends ChannelOutboundHandlerAdapter { - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - final NettyServerCall call = (NettyServerCall) msg; - ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers()); - ctx.write(response, promise).addListener(new CallWriteListener(call)); - } - - } - - private class CallWriteListener implements ChannelFutureListener { - - private NettyServerCall call; - - CallWriteListener(NettyServerCall call) { - this.call = call; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - call.done(); - if (future.isSuccess()) { - metrics.sentBytes(call.response.size()); - } - } - - } - @Override public void setSocketSendBufSize(int size) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java new file mode 100644 index 00000000000..3754d4414e5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Handle connection preamble. + */ +@InterfaceAudience.Private +class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler { + + private final NettyRpcServer rpcServer; + + public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) { + this.rpcServer = rpcServer; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + NettyServerRpcConnection conn = new NettyServerRpcConnection(rpcServer, ctx.channel()); + ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes()); + msg.readBytes(buf); + buf.flip(); + if (!conn.processPreamble(buf)) { + conn.close(); + return; + } + ChannelPipeline p = ctx.pipeline(); + ((NettyRpcServerRequestDecoder) p.get("decoder")).setConnection(conn); + p.remove(this); + p.remove("preambleDecoder"); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java new file mode 100644 index 00000000000..a40e9d3bc34 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.group.ChannelGroup; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Decoder for rpc request. + */ +@InterfaceAudience.Private +class NettyRpcServerRequestDecoder extends ChannelInboundHandlerAdapter { + + private final ChannelGroup allChannels; + + private final MetricsHBaseServer metrics; + + public NettyRpcServerRequestDecoder(ChannelGroup allChannels, MetricsHBaseServer metrics) { + this.allChannels = allChannels; + this.metrics = metrics; + } + + private NettyServerRpcConnection connection; + + void setConnection(NettyServerRpcConnection connection) { + this.connection = connection; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + allChannels.add(ctx.channel()); + if (NettyRpcServer.LOG.isDebugEnabled()) { + NettyRpcServer.LOG.debug("Connection from " + ctx.channel().remoteAddress() + + "; # active connections: " + (allChannels.size() - 1)); + } + super.channelActive(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf input = (ByteBuf) msg; + // 4 bytes length field + metrics.receivedBytes(input.readableBytes() + 4); + connection.process(input); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + allChannels.remove(ctx.channel()); + if (NettyRpcServer.LOG.isDebugEnabled()) { + NettyRpcServer.LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress() + + ". Number of active connections: " + (allChannels.size() - 1)); + } + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { + allChannels.remove(ctx.channel()); + if (NettyRpcServer.LOG.isDebugEnabled()) { + NettyRpcServer.LOG.debug("Connection from " + ctx.channel().remoteAddress() + + " catch unexpected exception from downstream.", + e.getCause()); + } + ctx.channel().close(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java new file mode 100644 index 00000000000..b5b6a6b7de2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Encoder for {@link RpcResponse}. + */ +@InterfaceAudience.Private +class NettyRpcServerResponseEncoder extends ChannelOutboundHandlerAdapter { + + private final MetricsHBaseServer metrics; + + NettyRpcServerResponseEncoder(MetricsHBaseServer metrics) { + this.metrics = metrics; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof RpcResponse) { + RpcResponse resp = (RpcResponse) msg; + BufferChain buf = resp.getResponse(); + ctx.write(Unpooled.wrappedBuffer(buf.getBuffers()), promise).addListener(f -> { + resp.done(); + if (f.isSuccess()) { + metrics.sentBytes(buf.size()); + } + }); + } else { + ctx.write(msg, promise); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java index 3cb9a5a7acc..bba2536544f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import io.netty.channel.ChannelFutureListener; - import java.io.IOException; import java.net.InetAddress; @@ -53,10 +51,8 @@ class NettyServerCall extends ServerCall { */ @Override public synchronized void sendResponseIfReady() throws IOException { + // set param null to reduce memory pressure + this.param = null; connection.channel.writeAndFlush(this); } - - public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException { - connection.channel.writeAndFlush(this).addListener(listener); - } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java index 7985295ebc0..61e12ab33dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java @@ -19,30 +19,21 @@ package org.apache.hadoop.hbase.ipc; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelFutureListener; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.Arrays; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; -import org.apache.hadoop.hbase.security.AccessDeniedException; -import org.apache.hadoop.hbase.security.AuthMethod; -import org.apache.hadoop.hbase.security.SaslStatus; -import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.IntWritable; import org.apache.htrace.TraceInfo; /** @@ -64,75 +55,6 @@ class NettyServerRpcConnection extends ServerRpcConnection { this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); } this.remotePort = inetSocketAddress.getPort(); - this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, - null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null); - this.setConnectionHeaderResponseCall = new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, - null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0, - rpcServer.reservoir, rpcServer.cellBlockBuilder, null); - this.authFailedCall = new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, - null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir, - rpcServer.cellBlockBuilder, null); - } - - void readPreamble(ByteBuf buffer) throws IOException { - byte[] rpcHead = { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() }; - if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) { - doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + - " but received HEADER=" + Bytes.toStringBinary(rpcHead) + " from " + toString()); - return; - } - // Now read the next two bytes, the version and the auth to use. - int version = buffer.readByte(); - byte authbyte = buffer.readByte(); - this.authMethod = AuthMethod.valueOf(authbyte); - if (version != NettyRpcServer.CURRENT_VERSION) { - String msg = getFatalConnectionString(version, authbyte); - doBadPreambleHandling(msg, new WrongVersionException(msg)); - return; - } - if (authMethod == null) { - String msg = getFatalConnectionString(version, authbyte); - doBadPreambleHandling(msg, new BadAuthException(msg)); - return; - } - if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { - if (this.rpcServer.allowFallbackToSimpleAuth) { - this.rpcServer.metrics.authenticationFallback(); - authenticatedWithFallback = true; - } else { - AccessDeniedException ae = new AccessDeniedException("Authentication is required"); - this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); - ((NettyServerCall) authFailedCall).sendResponseIfReady(ChannelFutureListener.CLOSE); - return; - } - } - if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { - doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, - null); - authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } - if (authMethod != AuthMethod.SIMPLE) { - useSasl = true; - } - connectionPreambleRead = true; - } - - private void doBadPreambleHandling(final String msg) throws IOException { - doBadPreambleHandling(msg, new FatalConnectionException(msg)); - } - - private void doBadPreambleHandling(final String msg, final Exception e) throws IOException { - NettyRpcServer.LOG.warn(msg); - NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, null, - null, System.currentTimeMillis(), 0, this.rpcServer.reservoir, - this.rpcServer.cellBlockBuilder, null); - this.rpcServer.setupResponse(null, fakeCall, e, msg); - // closes out the connection. - fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE); } void process(final ByteBuf buf) throws IOException, InterruptedException { @@ -145,9 +67,8 @@ class NettyServerRpcConnection extends ServerRpcConnection { }; process(new SingleByteBuff(buf.nioBuffer())); } else { - byte[] data = new byte[buf.readableBytes()]; - buf.readBytes(data, 0, data.length); - ByteBuffer connectionHeader = ByteBuffer.wrap(data); + ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes()); + buf.readBytes(connectionHeader); buf.release(); process(connectionHeader); } @@ -203,4 +124,9 @@ class NettyServerRpcConnection extends ServerRpcConnection { remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, reqCleanup); } + + @Override + protected void doRespond(RpcResponse resp) { + channel.writeAndFlush(resp); + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcResponse.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcResponse.java new file mode 100644 index 00000000000..a8c13548cf2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcResponse.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * An interface represent the response of an rpc call. + */ +@InterfaceAudience.Private +interface RpcResponse { + + BufferChain getResponse(); + + default void done() { + // nothing + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index d68a05e1239..f899867facd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY import com.google.common.annotations.VisibleForTesting; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -341,19 +340,6 @@ public abstract class RpcServer implements RpcServerInterface, } } - /** - * Setup response for the RPC Call. - * @param response buffer to serialize the response into - * @param call {@link ServerCall} to which we are setting up the response - * @param error error message, if the call failed - * @throws IOException - */ - protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t, - String error) throws IOException { - if (response != null) response.reset(); - call.setResponse(null, null, t, error); - } - Configuration getConf() { return conf; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 15fe3e6c8eb..e4bef988c93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -51,7 +51,7 @@ import org.apache.htrace.TraceInfo; * the result. */ @InterfaceAudience.Private -abstract class ServerCall implements RpcCall { +abstract class ServerCall implements RpcCall, RpcResponse { protected final int id; // the client's call id protected final BlockingService service; @@ -127,7 +127,8 @@ abstract class ServerCall implements RpcCall { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Presume the lock on processing request held by caller is protection enough") - void done() { + @Override + public void done() { if (this.cellBlockStream != null) { // This will return back the BBs which we got from pool. this.cellBlockStream.releaseResources(); @@ -178,18 +179,6 @@ abstract class ServerCall implements RpcCall { " deadline: " + deadline; } - protected synchronized void setSaslTokenResponse(ByteBuffer response) { - ByteBuffer[] responseBufs = new ByteBuffer[1]; - responseBufs[0] = response; - this.response = new BufferChain(responseBufs); - } - - protected synchronized void setConnectionHeaderResponse(ByteBuffer response) { - ByteBuffer[] responseBufs = new ByteBuffer[1]; - responseBufs[0] = response; - this.response = new BufferChain(responseBufs); - } - @Override public synchronized void setResponse(Message m, final CellScanner cells, Throwable t, String errorMsg) { @@ -268,7 +257,7 @@ abstract class ServerCall implements RpcCall { } } - protected void setExceptionResponse(Throwable t, String errorMsg, + static void setExceptionResponse(Throwable t, String errorMsg, ResponseHeader.Builder headerBuilder) { ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); exceptionBuilder.setExceptionClassName(t.getClass().getName()); @@ -286,7 +275,7 @@ abstract class ServerCall implements RpcCall { headerBuilder.setException(exceptionBuilder.build()); } - protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, List cellBlock) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. @@ -336,7 +325,7 @@ abstract class ServerCall implements RpcCall { } } - private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) + private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) throws IOException { ByteBufferUtils.putInt(pbBuf, totalSize); // create COS that works on BB @@ -351,7 +340,7 @@ abstract class ServerCall implements RpcCall { cos.checkNoSpaceLeft(); } - private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int totalSize, int totalPBSize) throws IOException { ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); writeToCOS(result, header, totalSize, pbBuf); @@ -523,4 +512,8 @@ abstract class ServerCall implements RpcCall { return tinfo; } + @Override + public synchronized BufferChain getResponse() { + return response; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index d4ab95c242b..c652afac855 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -17,8 +17,9 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; + import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataOutputStream; import java.io.IOException; @@ -68,9 +69,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CompressionCodec; @@ -89,8 +92,6 @@ import org.apache.htrace.TraceInfo; abstract class ServerRpcConnection implements Closeable { /** */ protected final RpcServer rpcServer; - // If initial preamble with version and magic has been read or not. - protected boolean connectionPreambleRead = false; // If the connection header has been read or not. protected boolean connectionHeaderRead = false; @@ -124,17 +125,6 @@ abstract class ServerRpcConnection implements Closeable { protected CryptoAES cryptoAES; protected boolean useWrap = false; protected boolean useCryptoAesWrap = false; - // Fake 'call' for failed authorization response - protected static final int AUTHORIZATION_FAILED_CALLID = -1; - protected ServerCall authFailedCall; - protected ByteArrayOutputStream authFailedResponse = - new ByteArrayOutputStream(); - // Fake 'call' for SASL context setup - protected static final int SASL_CALLID = -33; - protected ServerCall saslCall; - // Fake 'call' for connection header response - protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; - protected ServerCall setConnectionHeaderResponseCall; // was authentication allowed with a fallback to simple auth protected boolean authenticatedWithFallback; @@ -340,15 +330,13 @@ abstract class ServerRpcConnection implements Closeable { /** * No protobuf encoding of raw sasl messages */ - protected void doRawSaslReply(SaslStatus status, Writable rv, + protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, String error) throws IOException { - ByteBufferOutputStream saslResponse = null; - DataOutputStream out = null; - try { - // In my testing, have noticed that sasl messages are usually - // in the ballpark of 100-200. That's why the initial capacity is 256. - saslResponse = new ByteBufferOutputStream(256); - out = new DataOutputStream(saslResponse); + BufferChain bc; + // In my testing, have noticed that sasl messages are usually + // in the ballpark of 100-200. That's why the initial capacity is 256. + try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); + DataOutputStream out = new DataOutputStream(saslResponse)) { out.writeInt(status.state); // write status if (status == SaslStatus.SUCCESS) { rv.write(out); @@ -356,16 +344,9 @@ abstract class ServerRpcConnection implements Closeable { WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } - saslCall.setSaslTokenResponse(saslResponse.getByteBuffer()); - saslCall.sendResponseIfReady(); - } finally { - if (saslResponse != null) { - saslResponse.close(); - } - if (out != null) { - out.close(); - } + bc = new BufferChain(saslResponse.getByteBuffer()); } + doRespond(() -> bc); } public void saslReadAndProcess(ByteBuff saslToken) throws IOException, @@ -481,8 +462,7 @@ abstract class ServerRpcConnection implements Closeable { } } - private void processUnwrappedData(byte[] inBuf) throws IOException, - InterruptedException { + private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); // Read all RPCs contained in the inBuf, even partial ones while (true) { @@ -536,7 +516,7 @@ abstract class ServerRpcConnection implements Closeable { } } - protected boolean authorizeConnection() throws IOException { + private boolean authorizeConnection() throws IOException { try { // If auth method is DIGEST, the token was obtained by the // real user for the effective user, therefore not required to @@ -553,16 +533,14 @@ abstract class ServerRpcConnection implements Closeable { RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); } this.rpcServer.metrics.authorizationFailure(); - this.rpcServer.setupResponse(authFailedResponse, authFailedCall, - new AccessDeniedException(ae), ae.getMessage()); - authFailedCall.sendResponseIfReady(); + doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); return false; } return true; } // Reads the connection header following version - protected void processConnectionHeader(ByteBuff buf) throws IOException { + private void processConnectionHeader(ByteBuff buf) throws IOException { if (buf.hasArray()) { this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); } else { @@ -630,6 +608,9 @@ abstract class ServerRpcConnection implements Closeable { } } + /** + * Send the response for connection header + */ private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) throws FatalConnectionException { // Response the connection header if Crypto AES is enabled @@ -640,38 +621,21 @@ abstract class ServerRpcConnection implements Closeable { byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4]; Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4); Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length); - - doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length)); + byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length); + BufferChain bc; + try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4); + DataOutputStream out = new DataOutputStream(response)) { + out.writeInt(wrapped.length); + out.write(wrapped); + bc = new BufferChain(response.getByteBuffer()); + } + doRespond(() -> bc); } catch (IOException ex) { throw new UnsupportedCryptoException(ex.getMessage(), ex); } } - /** - * Send the response for connection header - */ - private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData) - throws IOException { - ByteBufferOutputStream response = null; - DataOutputStream out = null; - try { - response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4); - out = new DataOutputStream(response); - out.writeInt(wrappedCipherMetaData.length); - out.write(wrappedCipherMetaData); - - setConnectionHeaderResponseCall.setConnectionHeaderResponse(response - .getByteBuffer()); - setConnectionHeaderResponseCall.sendResponseIfReady(); - } finally { - if (out != null) { - out.close(); - } - if (response != null) { - response.close(); - } - } - } + protected abstract void doRespond(RpcResponse resp) throws IOException; /** * @param buf @@ -709,14 +673,14 @@ abstract class ServerRpcConnection implements Closeable { // Enforcing the call queue size, this triggers a retry in the client // This is a bit late to be doing this check - we have already read in the // total request. - if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { + if ((totalRequestSize + + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, totalRequestSize, null, null, 0, this.callCleanup); - ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - this.rpcServer.setupResponse(responseBuffer, callTooBig, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() - + ", is hbase.ipc.server.max.callqueue.size too small?"); + callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + + ", is hbase.ipc.server.max.callqueue.size too small?"); callTooBig.sendResponseIfReady(); return; } @@ -773,11 +737,9 @@ abstract class ServerRpcConnection implements Closeable { t = new DoNotRetryIOException(t); } - final ServerCall readParamsFailedCall = createCall(id, this.service, null, null, null, null, + ServerCall readParamsFailedCall = createCall(id, this.service, null, null, null, null, totalRequestSize, null, null, 0, this.callCleanup); - ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - this.rpcServer.setupResponse(responseBuffer, readParamsFailedCall, t, - msg + "; " + t.getMessage()); + readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage()); readParamsFailedCall.sendResponseIfReady(); return; } @@ -794,16 +756,81 @@ abstract class ServerRpcConnection implements Closeable { if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); - - ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - this.rpcServer.setupResponse(responseBuffer, call, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() - + ", too many items queued ?"); + call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + + ", too many items queued ?"); call.sendResponseIfReady(); } } + protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { + ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); + ServerCall.setExceptionResponse(e, msg, headerBuilder); + ByteBuffer headerBuf = + ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); + BufferChain buf = new BufferChain(headerBuf); + return () -> buf; + } + + private void doBadPreambleHandling(String msg) throws IOException { + doBadPreambleHandling(msg, new FatalConnectionException(msg)); + } + + private void doBadPreambleHandling(String msg, Exception e) throws IOException { + SimpleRpcServer.LOG.warn(msg); + doRespond(getErrorResponse(msg, e)); + } + + protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException { + assert preambleBuffer.remaining() == 6; + for (int i = 0; i < RPC_HEADER.length; i++) { + if (RPC_HEADER[i] != preambleBuffer.get()) { + doBadPreambleHandling( + "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" + + Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " + + toString()); + return false; + } + } + int version = preambleBuffer.get() & 0xFF; + byte authbyte = preambleBuffer.get(); + this.authMethod = AuthMethod.valueOf(authbyte); + if (version != SimpleRpcServer.CURRENT_VERSION) { + String msg = getFatalConnectionString(version, authbyte); + doBadPreambleHandling(msg, new WrongVersionException(msg)); + return false; + } + if (authMethod == null) { + String msg = getFatalConnectionString(version, authbyte); + doBadPreambleHandling(msg, new BadAuthException(msg)); + return false; + } + if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { + if (this.rpcServer.allowFallbackToSimpleAuth) { + this.rpcServer.metrics.authenticationFallback(); + authenticatedWithFallback = true; + } else { + AccessDeniedException ae = new AccessDeniedException("Authentication is required"); + doRespond(getErrorResponse(ae.getMessage(), ae)); + return false; + } + } + if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, + null); + authMethod = AuthMethod.SIMPLE; + // client has already sent the initial Sasl message and we + // should ignore it. Both client and server should fall back + // to simple auth from now on. + skipInitialSaslHandshake = true; + } + if (authMethod != AuthMethod.SIMPLE) { + useSasl = true; + } + return true; + } + public abstract boolean isConnectionOpen(); public abstract ServerCall createCall(int id, BlockingService service, MethodDescriptor md, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java index 5f072a9f7c0..c3f3f5d66b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java @@ -38,11 +38,11 @@ import org.apache.hadoop.util.StringUtils; */ @InterfaceAudience.Private class SimpleRpcServerResponder extends Thread { - /** */ + private final SimpleRpcServer simpleRpcServer; private final Selector writeSelector; private final Set writingCons = - Collections.newSetFromMap(new ConcurrentHashMap()); + Collections.newSetFromMap(new ConcurrentHashMap<>()); SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException { this.simpleRpcServer = simpleRpcServer; @@ -175,9 +175,9 @@ class SimpleRpcServerResponder extends Thread { if (connection == null) { throw new IllegalStateException("Coding error: SelectionKey key without attachment."); } - SimpleServerCall call = connection.responseQueue.peekFirst(); - if (call != null && now > call.lastSentTime + this.simpleRpcServer.purgeTimeout) { - conWithOldCalls.add(call.getConnection()); + if (connection.lastSentTime > 0 && + now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout) { + conWithOldCalls.add(connection); } } } @@ -217,35 +217,37 @@ class SimpleRpcServerResponder extends Thread { /** * Process the response for this call. You need to have the lock on * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock} - * @param call the call * @return true if we proceed the call fully, false otherwise. * @throws IOException */ - boolean processResponse(final SimpleServerCall call) throws IOException { + private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp) + throws IOException { boolean error = true; + BufferChain buf = resp.getResponse(); try { // Send as much data as we can in the non-blocking fashion long numBytes = - this.simpleRpcServer.channelWrite(call.getConnection().channel, call.response); + this.simpleRpcServer.channelWrite(conn.channel, buf); if (numBytes < 0) { - throw new HBaseIOException( - "Error writing on the socket " + "for the call:" + call.toShortString()); + throw new HBaseIOException("Error writing on the socket " + conn); } error = false; } finally { if (error) { - SimpleRpcServer.LOG.debug(getName() + call.toShortString() + ": output error -- closing"); + SimpleRpcServer.LOG.debug(conn + ": output error -- closing"); // We will be closing this connection itself. Mark this call as done so that all the // buffer(s) it got from pool can get released - call.done(); - this.simpleRpcServer.closeConnection(call.getConnection()); + resp.done(); + this.simpleRpcServer.closeConnection(conn); } } - if (!call.response.hasRemaining()) { - call.done(); + if (!buf.hasRemaining()) { + resp.done(); return true; } else { + // set the serve time when the response has to be sent later + conn.lastSentTime = System.currentTimeMillis(); return false; // Socket can't take more, we will have to come back. } } @@ -263,12 +265,12 @@ class SimpleRpcServerResponder extends Thread { try { for (int i = 0; i < 20; i++) { // protection if some handlers manage to need all the responder - SimpleServerCall call = connection.responseQueue.pollFirst(); - if (call == null) { + RpcResponse resp = connection.responseQueue.pollFirst(); + if (resp == null) { return true; } - if (!processResponse(call)) { - connection.responseQueue.addFirst(call); + if (!processResponse(connection, resp)) { + connection.responseQueue.addFirst(resp); return false; } } @@ -282,35 +284,30 @@ class SimpleRpcServerResponder extends Thread { // // Enqueue a response from the application. // - void doRespond(SimpleServerCall call) throws IOException { + void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException { boolean added = false; - // If there is already a write in progress, we don't wait. This allows to free the handlers // immediately for other tasks. - if (call.getConnection().responseQueue.isEmpty() && - call.getConnection().responseWriteLock.tryLock()) { + if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) { try { - if (call.getConnection().responseQueue.isEmpty()) { + if (conn.responseQueue.isEmpty()) { // If we're alone, we can try to do a direct call to the socket. It's - // an optimisation to save on context switches and data transfer between cores.. - if (processResponse(call)) { + // an optimization to save on context switches and data transfer between cores.. + if (processResponse(conn, resp)) { return; // we're done. } // Too big to fit, putting ahead. - call.getConnection().responseQueue.addFirst(call); + conn.responseQueue.addFirst(resp); added = true; // We will register to the selector later, outside of the lock. } } finally { - call.getConnection().responseWriteLock.unlock(); + conn.responseWriteLock.unlock(); } } if (!added) { - call.getConnection().responseQueue.addLast(call); + conn.responseQueue.addLast(resp); } - call.responder.registerForWrite(call.getConnection()); - - // set the serve time when the response has to be sent later - call.lastSentTime = System.currentTimeMillis(); + registerForWrite(conn); } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java index af575ea42da..080c4ba5f05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java @@ -37,8 +37,6 @@ import org.apache.htrace.TraceInfo; @InterfaceAudience.Private class SimpleServerCall extends ServerCall { - long lastSentTime; - final SimpleRpcServerResponder responder; @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", @@ -59,7 +57,7 @@ class SimpleServerCall extends ServerCall { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Presume the lock on processing request held by caller is protection enough") @Override - void done() { + public void done() { super.done(); this.getConnection().decRpcCount(); // Say that we're done with this call. } @@ -68,10 +66,10 @@ class SimpleServerCall extends ServerCall { public synchronized void sendResponseIfReady() throws IOException { // set param null to reduce memory pressure this.param = null; - this.responder.doRespond(this); + this.responder.doRespond(getConnection(), this); } SimpleServerRpcConnection getConnection() { - return (SimpleServerRpcConnection) this.connection; + return this.connection; } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java index b2507d8faed..2327c097eb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -24,7 +24,6 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SocketChannel; -import java.util.Arrays; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; @@ -32,26 +31,19 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; -import org.apache.hadoop.hbase.security.AccessDeniedException; -import org.apache.hadoop.hbase.security.AuthMethod; -import org.apache.hadoop.hbase.security.SaslStatus; -import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.IntWritable; import org.apache.htrace.TraceInfo; /** Reads calls from a connection and queues them for handling. */ @@ -64,13 +56,17 @@ class SimpleServerRpcConnection extends ServerRpcConnection { private ByteBuff data; private ByteBuffer dataLengthBuffer; private ByteBuffer preambleBuffer; - protected final ConcurrentLinkedDeque responseQueue = - new ConcurrentLinkedDeque<>(); - final Lock responseWriteLock = new ReentrantLock(); private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs private long lastContact; private final Socket socket; - private final SimpleRpcServerResponder responder; + final SimpleRpcServerResponder responder; + + // If initial preamble with version and magic has been read or not. + private boolean connectionPreambleRead = false; + + final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque<>(); + final Lock responseWriteLock = new ReentrantLock(); + long lastSentTime = -1L; public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel, long lastContact) { @@ -95,15 +91,6 @@ class SimpleServerRpcConnection extends ServerRpcConnection { "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize); } } - this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, - null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null, - rpcServer.responder); - this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID, - null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0, - rpcServer.reservoir, rpcServer.cellBlockBuilder, null, rpcServer.responder); - this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, - null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir, - rpcServer.cellBlockBuilder, null, rpcServer.responder); this.responder = rpcServer.responder; } @@ -138,49 +125,9 @@ class SimpleServerRpcConnection extends ServerRpcConnection { if (count < 0 || preambleBuffer.remaining() > 0) { return count; } - // Check for 'HBas' magic. preambleBuffer.flip(); - for (int i = 0; i < HConstants.RPC_HEADER.length; i++) { - if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) { - return doBadPreambleHandling("Expected HEADER=" + - Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" + - Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) + - " from " + toString()); - } - } - int version = preambleBuffer.get(HConstants.RPC_HEADER.length); - byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1); - this.authMethod = AuthMethod.valueOf(authbyte); - if (version != SimpleRpcServer.CURRENT_VERSION) { - String msg = getFatalConnectionString(version, authbyte); - return doBadPreambleHandling(msg, new WrongVersionException(msg)); - } - if (authMethod == null) { - String msg = getFatalConnectionString(version, authbyte); - return doBadPreambleHandling(msg, new BadAuthException(msg)); - } - if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { - if (this.rpcServer.allowFallbackToSimpleAuth) { - this.rpcServer.metrics.authenticationFallback(); - authenticatedWithFallback = true; - } else { - AccessDeniedException ae = new AccessDeniedException("Authentication is required"); - this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); - authFailedCall.sendResponseIfReady(); - throw ae; - } - } - if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { - doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, - null); - authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } - if (authMethod != AuthMethod.SIMPLE) { - useSasl = true; + if (!processPreamble(preambleBuffer)) { + return -1; } preambleBuffer = null; // do not need it anymore connectionPreambleRead = true; @@ -272,19 +219,15 @@ class SimpleServerRpcConnection extends ServerRpcConnection { // Otherwise, throw a DoNotRetryIOException. if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { - this.rpcServer.setupResponse(null, reqTooBig, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, - msg); + reqTooBig.setResponse(null, null, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, msg); } else { - this.rpcServer.setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg); - } - // We are going to close the connection, make sure we process the response - // before that. In rare case when this fails, we still close the connection. - responseWriteLock.lock(); - try { - this.responder.processResponse(reqTooBig); - } finally { - responseWriteLock.unlock(); + reqTooBig.setResponse(null, null, new DoNotRetryIOException(), msg); } + // In most cases we will write out the response directly. If not, it is still OK to just + // close the connection without writing out the reqTooBig response. Do not try to write + // out directly here, and it will cause deserialization error if the connection is slow + // and we have a half writing response in the queue. + reqTooBig.sendResponseIfReady(); } // Close the connection return -1; @@ -365,21 +308,6 @@ class SimpleServerRpcConnection extends ServerRpcConnection { } } - private int doBadPreambleHandling(final String msg) throws IOException { - return doBadPreambleHandling(msg, new FatalConnectionException(msg)); - } - - private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { - SimpleRpcServer.LOG.warn(msg); - SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1, - null, null, System.currentTimeMillis(), 0, this.rpcServer.reservoir, - this.rpcServer.cellBlockBuilder, null, responder); - this.rpcServer.setupResponse(null, fakeCall, e, msg); - this.responder.doRespond(fakeCall); - // Returning -1 closes out the connection. - return -1; - } - @Override public synchronized void close() { disposeSasl(); @@ -421,4 +349,9 @@ class SimpleServerRpcConnection extends ServerRpcConnection { remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); } + + @Override + protected void doRespond(RpcResponse resp) throws IOException { + responder.doRespond(this, resp); + } } \ No newline at end of file