HBASE-18013 Write response directly instead of creating a fake call when setup connection

This commit is contained in:
zhangduo 2017-05-23 10:56:22 +08:00
parent 28d619b22b
commit 3f75ba195c
14 changed files with 433 additions and 460 deletions

View File

@ -34,7 +34,7 @@ class BufferChain {
private int bufferOffset = 0;
private int size;
BufferChain(ByteBuffer[] buffers) {
BufferChain(ByteBuffer... buffers) {
for (ByteBuffer b : buffers) {
this.remaining += b.remaining();
}

View File

@ -18,28 +18,19 @@
package org.apache.hadoop.hbase.ipc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.concurrent.GlobalEventExecutor;
@ -72,11 +63,11 @@ public class NettyRpcServer extends RpcServer {
public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
protected final InetSocketAddress bindAddress;
private final InetSocketAddress bindAddress;
private final CountDownLatch closed = new CountDownLatch(1);
private final Channel serverChannel;
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);;
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public NettyRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
@ -107,7 +98,21 @@ public class NettyRpcServer extends RpcServer {
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
bootstrap.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
bootstrap.childHandler(new Initializer(maxRequestSize));
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
preambleDecoder.setSingleDecode(true);
pipeline.addLast("preambleDecoder", preambleDecoder);
pipeline.addLast("preambleHandler", new NettyRpcServerPreambleHandler(NettyRpcServer.this));
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
}
});
try {
serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
@ -173,125 +178,6 @@ public class NettyRpcServer extends RpcServer {
return ((InetSocketAddress) serverChannel.localAddress());
}
private class Initializer extends ChannelInitializer<SocketChannel> {
final int maxRequestSize;
Initializer(int maxRequestSize) {
this.maxRequestSize = maxRequestSize;
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("header", new ConnectionHeaderHandler());
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
maxRequestSize, 0, 4, 0, 4, true));
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
}
}
private class ConnectionHeaderHandler extends ByteToMessageDecoder {
private NettyServerRpcConnection connection;
ConnectionHeaderHandler() {
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf,
List<Object> out) throws Exception {
if (byteBuf.readableBytes() < 6) {
return;
}
connection = new NettyServerRpcConnection(NettyRpcServer.this, ctx.channel());
connection.readPreamble(byteBuf);
((MessageDecoder) ctx.pipeline().get("decoder"))
.setConnection(connection);
ctx.pipeline().remove(this);
}
}
private class MessageDecoder extends ChannelInboundHandlerAdapter {
private NettyServerRpcConnection connection;
void setConnection(NettyServerRpcConnection connection) {
this.connection = connection;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
allChannels.add(ctx.channel());
if (LOG.isDebugEnabled()) {
LOG.debug("Connection from " + ctx.channel().remoteAddress()
+ "; # active connections: " + getNumOpenConnections());
}
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf input = (ByteBuf) msg;
// 4 bytes length field
metrics.receivedBytes(input.readableBytes() + 4);
connection.process(input);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
allChannels.remove(ctx.channel());
if (LOG.isDebugEnabled()) {
LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress()
+ ". Number of active connections: " + getNumOpenConnections());
}
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
allChannels.remove(ctx.channel());
if (LOG.isDebugEnabled()) {
LOG.debug("Connection from " + ctx.channel().remoteAddress()
+ " catch unexpected exception from downstream.", e.getCause());
}
ctx.channel().close();
}
}
private class MessageEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
final NettyServerCall call = (NettyServerCall) msg;
ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers());
ctx.write(response, promise).addListener(new CallWriteListener(call));
}
}
private class CallWriteListener implements ChannelFutureListener {
private NettyServerCall call;
CallWriteListener(NettyServerCall call) {
this.call = call;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
call.done();
if (future.isSuccess()) {
metrics.sentBytes(call.response.size());
}
}
}
@Override
public void setSocketSendBufSize(int size) {
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Handle connection preamble.
*/
@InterfaceAudience.Private
class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final NettyRpcServer rpcServer;
public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) {
this.rpcServer = rpcServer;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
NettyServerRpcConnection conn = new NettyServerRpcConnection(rpcServer, ctx.channel());
ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes());
msg.readBytes(buf);
buf.flip();
if (!conn.processPreamble(buf)) {
conn.close();
return;
}
ChannelPipeline p = ctx.pipeline();
((NettyRpcServerRequestDecoder) p.get("decoder")).setConnection(conn);
p.remove(this);
p.remove("preambleDecoder");
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Decoder for rpc request.
*/
@InterfaceAudience.Private
class NettyRpcServerRequestDecoder extends ChannelInboundHandlerAdapter {
private final ChannelGroup allChannels;
private final MetricsHBaseServer metrics;
public NettyRpcServerRequestDecoder(ChannelGroup allChannels, MetricsHBaseServer metrics) {
this.allChannels = allChannels;
this.metrics = metrics;
}
private NettyServerRpcConnection connection;
void setConnection(NettyServerRpcConnection connection) {
this.connection = connection;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
allChannels.add(ctx.channel());
if (NettyRpcServer.LOG.isDebugEnabled()) {
NettyRpcServer.LOG.debug("Connection from " + ctx.channel().remoteAddress() +
"; # active connections: " + (allChannels.size() - 1));
}
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
// 4 bytes length field
metrics.receivedBytes(input.readableBytes() + 4);
connection.process(input);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
allChannels.remove(ctx.channel());
if (NettyRpcServer.LOG.isDebugEnabled()) {
NettyRpcServer.LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress() +
". Number of active connections: " + (allChannels.size() - 1));
}
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
allChannels.remove(ctx.channel());
if (NettyRpcServer.LOG.isDebugEnabled()) {
NettyRpcServer.LOG.debug("Connection from " + ctx.channel().remoteAddress() +
" catch unexpected exception from downstream.",
e.getCause());
}
ctx.channel().close();
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Encoder for {@link RpcResponse}.
*/
@InterfaceAudience.Private
class NettyRpcServerResponseEncoder extends ChannelOutboundHandlerAdapter {
private final MetricsHBaseServer metrics;
NettyRpcServerResponseEncoder(MetricsHBaseServer metrics) {
this.metrics = metrics;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof RpcResponse) {
RpcResponse resp = (RpcResponse) msg;
BufferChain buf = resp.getResponse();
ctx.write(Unpooled.wrappedBuffer(buf.getBuffers()), promise).addListener(f -> {
resp.done();
if (f.isSuccess()) {
metrics.sentBytes(buf.size());
}
});
} else {
ctx.write(msg, promise);
}
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.channel.ChannelFutureListener;
import java.io.IOException;
import java.net.InetAddress;
@ -53,10 +51,8 @@ class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
*/
@Override
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
connection.channel.writeAndFlush(this);
}
public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
connection.channel.writeAndFlush(this).addListener(listener);
}
}

View File

@ -19,30 +19,21 @@ package org.apache.hadoop.hbase.ipc;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.htrace.TraceInfo;
/**
@ -64,75 +55,6 @@ class NettyServerRpcConnection extends ServerRpcConnection {
this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
}
this.remotePort = inetSocketAddress.getPort();
this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
this.setConnectionHeaderResponseCall = new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
this.authFailedCall = new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null,
null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir,
rpcServer.cellBlockBuilder, null);
}
void readPreamble(ByteBuf buffer) throws IOException {
byte[] rpcHead = { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) +
" but received HEADER=" + Bytes.toStringBinary(rpcHead) + " from " + toString());
return;
}
// Now read the next two bytes, the version and the auth to use.
int version = buffer.readByte();
byte authbyte = buffer.readByte();
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != NettyRpcServer.CURRENT_VERSION) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new WrongVersionException(msg));
return;
}
if (authMethod == null) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new BadAuthException(msg));
return;
}
if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
if (this.rpcServer.allowFallbackToSimpleAuth) {
this.rpcServer.metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
((NettyServerCall) authFailedCall).sendResponseIfReady(ChannelFutureListener.CLOSE);
return;
}
}
if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
connectionPreambleRead = true;
}
private void doBadPreambleHandling(final String msg) throws IOException {
doBadPreambleHandling(msg, new FatalConnectionException(msg));
}
private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
NettyRpcServer.LOG.warn(msg);
NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, null,
null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, null);
this.rpcServer.setupResponse(null, fakeCall, e, msg);
// closes out the connection.
fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
}
void process(final ByteBuf buf) throws IOException, InterruptedException {
@ -145,9 +67,8 @@ class NettyServerRpcConnection extends ServerRpcConnection {
};
process(new SingleByteBuff(buf.nioBuffer()));
} else {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data, 0, data.length);
ByteBuffer connectionHeader = ByteBuffer.wrap(data);
ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
buf.readBytes(connectionHeader);
buf.release();
process(connectionHeader);
}
@ -203,4 +124,9 @@ class NettyServerRpcConnection extends ServerRpcConnection {
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, reqCleanup);
}
@Override
protected void doRespond(RpcResponse resp) {
channel.writeAndFlush(resp);
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* An interface represent the response of an rpc call.
*/
@InterfaceAudience.Private
interface RpcResponse {
BufferChain getResponse();
default void done() {
// nothing
}
}

View File

@ -22,7 +22,6 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -341,19 +340,6 @@ public abstract class RpcServer implements RpcServerInterface,
}
}
/**
* Setup response for the RPC Call.
* @param response buffer to serialize the response into
* @param call {@link ServerCall} to which we are setting up the response
* @param error error message, if the call failed
* @throws IOException
*/
protected void setupResponse(ByteArrayOutputStream response, ServerCall<?> call, Throwable t,
String error) throws IOException {
if (response != null) response.reset();
call.setResponse(null, null, t, error);
}
Configuration getConf() {
return conf;
}

View File

@ -51,7 +51,7 @@ import org.apache.htrace.TraceInfo;
* the result.
*/
@InterfaceAudience.Private
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
protected final int id; // the client's call id
protected final BlockingService service;
@ -127,7 +127,8 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "Presume the lock on processing request held by caller is protection enough")
void done() {
@Override
public void done() {
if (this.cellBlockStream != null) {
// This will return back the BBs which we got from pool.
this.cellBlockStream.releaseResources();
@ -178,18 +179,6 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
" deadline: " + deadline;
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
this.response = new BufferChain(responseBufs);
}
protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
this.response = new BufferChain(responseBufs);
}
@Override
public synchronized void setResponse(Message m, final CellScanner cells,
Throwable t, String errorMsg) {
@ -268,7 +257,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
}
}
protected void setExceptionResponse(Throwable t, String errorMsg,
static void setExceptionResponse(Throwable t, String errorMsg,
ResponseHeader.Builder headerBuilder) {
ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
exceptionBuilder.setExceptionClassName(t.getClass().getName());
@ -286,7 +275,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
headerBuilder.setException(exceptionBuilder.build());
}
protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
@ -336,7 +325,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
}
}
private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
throws IOException {
ByteBufferUtils.putInt(pbBuf, totalSize);
// create COS that works on BB
@ -351,7 +340,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
cos.checkNoSpaceLeft();
}
private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int totalSize, int totalPBSize) throws IOException {
ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
writeToCOS(result, header, totalSize, pbBuf);
@ -523,4 +512,8 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
return tinfo;
}
@Override
public synchronized BufferChain getResponse() {
return response;
}
}

View File

@ -17,8 +17,9 @@
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
@ -68,9 +69,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
@ -89,8 +92,6 @@ import org.apache.htrace.TraceInfo;
abstract class ServerRpcConnection implements Closeable {
/** */
protected final RpcServer rpcServer;
// If initial preamble with version and magic has been read or not.
protected boolean connectionPreambleRead = false;
// If the connection header has been read or not.
protected boolean connectionHeaderRead = false;
@ -124,17 +125,6 @@ abstract class ServerRpcConnection implements Closeable {
protected CryptoAES cryptoAES;
protected boolean useWrap = false;
protected boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
protected static final int AUTHORIZATION_FAILED_CALLID = -1;
protected ServerCall<?> authFailedCall;
protected ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
protected static final int SASL_CALLID = -33;
protected ServerCall<?> saslCall;
// Fake 'call' for connection header response
protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
protected ServerCall<?> setConnectionHeaderResponseCall;
// was authentication allowed with a fallback to simple auth
protected boolean authenticatedWithFallback;
@ -340,15 +330,13 @@ abstract class ServerRpcConnection implements Closeable {
/**
* No protobuf encoding of raw sasl messages
*/
protected void doRawSaslReply(SaslStatus status, Writable rv,
protected final void doRawSaslReply(SaslStatus status, Writable rv,
String errorClass, String error) throws IOException {
ByteBufferOutputStream saslResponse = null;
DataOutputStream out = null;
try {
BufferChain bc;
// In my testing, have noticed that sasl messages are usually
// in the ballpark of 100-200. That's why the initial capacity is 256.
saslResponse = new ByteBufferOutputStream(256);
out = new DataOutputStream(saslResponse);
try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
DataOutputStream out = new DataOutputStream(saslResponse)) {
out.writeInt(status.state); // write status
if (status == SaslStatus.SUCCESS) {
rv.write(out);
@ -356,16 +344,9 @@ abstract class ServerRpcConnection implements Closeable {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
saslCall.sendResponseIfReady();
} finally {
if (saslResponse != null) {
saslResponse.close();
}
if (out != null) {
out.close();
}
bc = new BufferChain(saslResponse.getByteBuffer());
}
doRespond(() -> bc);
}
public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
@ -481,8 +462,7 @@ abstract class ServerRpcConnection implements Closeable {
}
}
private void processUnwrappedData(byte[] inBuf) throws IOException,
InterruptedException {
private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
// Read all RPCs contained in the inBuf, even partial ones
while (true) {
@ -536,7 +516,7 @@ abstract class ServerRpcConnection implements Closeable {
}
}
protected boolean authorizeConnection() throws IOException {
private boolean authorizeConnection() throws IOException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
@ -553,16 +533,14 @@ abstract class ServerRpcConnection implements Closeable {
RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
}
this.rpcServer.metrics.authorizationFailure();
this.rpcServer.setupResponse(authFailedResponse, authFailedCall,
new AccessDeniedException(ae), ae.getMessage());
authFailedCall.sendResponseIfReady();
doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae)));
return false;
}
return true;
}
// Reads the connection header following version
protected void processConnectionHeader(ByteBuff buf) throws IOException {
private void processConnectionHeader(ByteBuff buf) throws IOException {
if (buf.hasArray()) {
this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
} else {
@ -630,6 +608,9 @@ abstract class ServerRpcConnection implements Closeable {
}
}
/**
* Send the response for connection header
*/
private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
throws FatalConnectionException {
// Response the connection header if Crypto AES is enabled
@ -640,38 +621,21 @@ abstract class ServerRpcConnection implements Closeable {
byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length);
BufferChain bc;
try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4);
DataOutputStream out = new DataOutputStream(response)) {
out.writeInt(wrapped.length);
out.write(wrapped);
bc = new BufferChain(response.getByteBuffer());
}
doRespond(() -> bc);
} catch (IOException ex) {
throw new UnsupportedCryptoException(ex.getMessage(), ex);
}
}
/**
* Send the response for connection header
*/
private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
throws IOException {
ByteBufferOutputStream response = null;
DataOutputStream out = null;
try {
response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
out = new DataOutputStream(response);
out.writeInt(wrappedCipherMetaData.length);
out.write(wrappedCipherMetaData);
setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
.getByteBuffer());
setConnectionHeaderResponseCall.sendResponseIfReady();
} finally {
if (out != null) {
out.close();
}
if (response != null) {
response.close();
}
}
}
protected abstract void doRespond(RpcResponse resp) throws IOException;
/**
* @param buf
@ -709,14 +673,14 @@ abstract class ServerRpcConnection implements Closeable {
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
if ((totalRequestSize +
this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, null, 0, this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
this.rpcServer.setupResponse(responseBuffer, callTooBig, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName()
+ ", is hbase.ipc.server.max.callqueue.size too small?");
callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", is hbase.ipc.server.max.callqueue.size too small?");
callTooBig.sendResponseIfReady();
return;
}
@ -773,11 +737,9 @@ abstract class ServerRpcConnection implements Closeable {
t = new DoNotRetryIOException(t);
}
final ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, null, 0, this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
this.rpcServer.setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage());
readParamsFailedCall.sendResponseIfReady();
return;
}
@ -794,16 +756,81 @@ abstract class ServerRpcConnection implements Closeable {
if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
this.rpcServer.setupResponse(responseBuffer, call, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName()
+ ", too many items queued ?");
call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", too many items queued ?");
call.sendResponseIfReady();
}
}
protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1);
ServerCall.setExceptionResponse(e, msg, headerBuilder);
ByteBuffer headerBuf =
ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null);
BufferChain buf = new BufferChain(headerBuf);
return () -> buf;
}
private void doBadPreambleHandling(String msg) throws IOException {
doBadPreambleHandling(msg, new FatalConnectionException(msg));
}
private void doBadPreambleHandling(String msg, Exception e) throws IOException {
SimpleRpcServer.LOG.warn(msg);
doRespond(getErrorResponse(msg, e));
}
protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException {
assert preambleBuffer.remaining() == 6;
for (int i = 0; i < RPC_HEADER.length; i++) {
if (RPC_HEADER[i] != preambleBuffer.get()) {
doBadPreambleHandling(
"Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" +
Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " +
toString());
return false;
}
}
int version = preambleBuffer.get() & 0xFF;
byte authbyte = preambleBuffer.get();
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != SimpleRpcServer.CURRENT_VERSION) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new WrongVersionException(msg));
return false;
}
if (authMethod == null) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new BadAuthException(msg));
return false;
}
if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
if (this.rpcServer.allowFallbackToSimpleAuth) {
this.rpcServer.metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
doRespond(getErrorResponse(ae.getMessage(), ae));
return false;
}
}
if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
return true;
}
public abstract boolean isConnectionOpen();
public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,

View File

@ -38,11 +38,11 @@ import org.apache.hadoop.util.StringUtils;
*/
@InterfaceAudience.Private
class SimpleRpcServerResponder extends Thread {
/** */
private final SimpleRpcServer simpleRpcServer;
private final Selector writeSelector;
private final Set<SimpleServerRpcConnection> writingCons =
Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>());
Collections.newSetFromMap(new ConcurrentHashMap<>());
SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
this.simpleRpcServer = simpleRpcServer;
@ -175,9 +175,9 @@ class SimpleRpcServerResponder extends Thread {
if (connection == null) {
throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
}
SimpleServerCall call = connection.responseQueue.peekFirst();
if (call != null && now > call.lastSentTime + this.simpleRpcServer.purgeTimeout) {
conWithOldCalls.add(call.getConnection());
if (connection.lastSentTime > 0 &&
now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout) {
conWithOldCalls.add(connection);
}
}
}
@ -217,35 +217,37 @@ class SimpleRpcServerResponder extends Thread {
/**
* Process the response for this call. You need to have the lock on
* {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
* @param call the call
* @return true if we proceed the call fully, false otherwise.
* @throws IOException
*/
boolean processResponse(final SimpleServerCall call) throws IOException {
private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp)
throws IOException {
boolean error = true;
BufferChain buf = resp.getResponse();
try {
// Send as much data as we can in the non-blocking fashion
long numBytes =
this.simpleRpcServer.channelWrite(call.getConnection().channel, call.response);
this.simpleRpcServer.channelWrite(conn.channel, buf);
if (numBytes < 0) {
throw new HBaseIOException(
"Error writing on the socket " + "for the call:" + call.toShortString());
throw new HBaseIOException("Error writing on the socket " + conn);
}
error = false;
} finally {
if (error) {
SimpleRpcServer.LOG.debug(getName() + call.toShortString() + ": output error -- closing");
SimpleRpcServer.LOG.debug(conn + ": output error -- closing");
// We will be closing this connection itself. Mark this call as done so that all the
// buffer(s) it got from pool can get released
call.done();
this.simpleRpcServer.closeConnection(call.getConnection());
resp.done();
this.simpleRpcServer.closeConnection(conn);
}
}
if (!call.response.hasRemaining()) {
call.done();
if (!buf.hasRemaining()) {
resp.done();
return true;
} else {
// set the serve time when the response has to be sent later
conn.lastSentTime = System.currentTimeMillis();
return false; // Socket can't take more, we will have to come back.
}
}
@ -263,12 +265,12 @@ class SimpleRpcServerResponder extends Thread {
try {
for (int i = 0; i < 20; i++) {
// protection if some handlers manage to need all the responder
SimpleServerCall call = connection.responseQueue.pollFirst();
if (call == null) {
RpcResponse resp = connection.responseQueue.pollFirst();
if (resp == null) {
return true;
}
if (!processResponse(call)) {
connection.responseQueue.addFirst(call);
if (!processResponse(connection, resp)) {
connection.responseQueue.addFirst(resp);
return false;
}
}
@ -282,35 +284,30 @@ class SimpleRpcServerResponder extends Thread {
//
// Enqueue a response from the application.
//
void doRespond(SimpleServerCall call) throws IOException {
void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException {
boolean added = false;
// If there is already a write in progress, we don't wait. This allows to free the handlers
// immediately for other tasks.
if (call.getConnection().responseQueue.isEmpty() &&
call.getConnection().responseWriteLock.tryLock()) {
if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) {
try {
if (call.getConnection().responseQueue.isEmpty()) {
if (conn.responseQueue.isEmpty()) {
// If we're alone, we can try to do a direct call to the socket. It's
// an optimisation to save on context switches and data transfer between cores..
if (processResponse(call)) {
// an optimization to save on context switches and data transfer between cores..
if (processResponse(conn, resp)) {
return; // we're done.
}
// Too big to fit, putting ahead.
call.getConnection().responseQueue.addFirst(call);
conn.responseQueue.addFirst(resp);
added = true; // We will register to the selector later, outside of the lock.
}
} finally {
call.getConnection().responseWriteLock.unlock();
conn.responseWriteLock.unlock();
}
}
if (!added) {
call.getConnection().responseQueue.addLast(call);
conn.responseQueue.addLast(resp);
}
call.responder.registerForWrite(call.getConnection());
// set the serve time when the response has to be sent later
call.lastSentTime = System.currentTimeMillis();
registerForWrite(conn);
}
}

View File

@ -37,8 +37,6 @@ import org.apache.htrace.TraceInfo;
@InterfaceAudience.Private
class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
long lastSentTime;
final SimpleRpcServerResponder responder;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
@ -59,7 +57,7 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "Presume the lock on processing request held by caller is protection enough")
@Override
void done() {
public void done() {
super.done();
this.getConnection().decRpcCount(); // Say that we're done with this call.
}
@ -68,10 +66,10 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
this.responder.doRespond(this);
this.responder.doRespond(getConnection(), this);
}
SimpleServerRpcConnection getConnection() {
return (SimpleServerRpcConnection) this.connection;
return this.connection;
}
}

View File

@ -24,7 +24,6 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
@ -32,26 +31,19 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.IntWritable;
import org.apache.htrace.TraceInfo;
/** Reads calls from a connection and queues them for handling. */
@ -64,13 +56,17 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
private ByteBuff data;
private ByteBuffer dataLengthBuffer;
private ByteBuffer preambleBuffer;
protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue =
new ConcurrentLinkedDeque<>();
final Lock responseWriteLock = new ReentrantLock();
private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
private long lastContact;
private final Socket socket;
private final SimpleRpcServerResponder responder;
final SimpleRpcServerResponder responder;
// If initial preamble with version and magic has been read or not.
private boolean connectionPreambleRead = false;
final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>();
final Lock responseWriteLock = new ReentrantLock();
long lastSentTime = -1L;
public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel,
long lastContact) {
@ -95,15 +91,6 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
"Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize);
}
}
this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null,
rpcServer.responder);
this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
rpcServer.reservoir, rpcServer.cellBlockBuilder, null, rpcServer.responder);
this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null,
null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir,
rpcServer.cellBlockBuilder, null, rpcServer.responder);
this.responder = rpcServer.responder;
}
@ -138,49 +125,9 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
if (count < 0 || preambleBuffer.remaining() > 0) {
return count;
}
// Check for 'HBas' magic.
preambleBuffer.flip();
for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
return doBadPreambleHandling("Expected HEADER=" +
Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
" from " + toString());
}
}
int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != SimpleRpcServer.CURRENT_VERSION) {
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new WrongVersionException(msg));
}
if (authMethod == null) {
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new BadAuthException(msg));
}
if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
if (this.rpcServer.allowFallbackToSimpleAuth) {
this.rpcServer.metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
authFailedCall.sendResponseIfReady();
throw ae;
}
}
if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
if (!processPreamble(preambleBuffer)) {
return -1;
}
preambleBuffer = null; // do not need it anymore
connectionPreambleRead = true;
@ -272,19 +219,15 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
// Otherwise, throw a DoNotRetryIOException.
if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
this.rpcServer.setupResponse(null, reqTooBig, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION,
msg);
reqTooBig.setResponse(null, null, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, msg);
} else {
this.rpcServer.setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
}
// We are going to close the connection, make sure we process the response
// before that. In rare case when this fails, we still close the connection.
responseWriteLock.lock();
try {
this.responder.processResponse(reqTooBig);
} finally {
responseWriteLock.unlock();
reqTooBig.setResponse(null, null, new DoNotRetryIOException(), msg);
}
// In most cases we will write out the response directly. If not, it is still OK to just
// close the connection without writing out the reqTooBig response. Do not try to write
// out directly here, and it will cause deserialization error if the connection is slow
// and we have a half writing response in the queue.
reqTooBig.sendResponseIfReady();
}
// Close the connection
return -1;
@ -365,21 +308,6 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
}
}
private int doBadPreambleHandling(final String msg) throws IOException {
return doBadPreambleHandling(msg, new FatalConnectionException(msg));
}
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
SimpleRpcServer.LOG.warn(msg);
SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1,
null, null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, null, responder);
this.rpcServer.setupResponse(null, fakeCall, e, msg);
this.responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
return -1;
}
@Override
public synchronized void close() {
disposeSasl();
@ -421,4 +349,9 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
}
@Override
protected void doRespond(RpcResponse resp) throws IOException {
responder.doRespond(this, resp);
}
}