diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 9f6b7b53a2f..6ed90adfafd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.nio; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; @@ -27,8 +29,6 @@ import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.UnsafeAccess; import org.apache.hadoop.hbase.util.UnsafeAvailChecker; -import com.google.common.annotations.VisibleForTesting; - import sun.nio.ch.DirectBuffer; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index f16fc5077c0..f476b111b46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -75,8 +75,8 @@ public class CallRunner { * @deprecated As of release 2.0, this will be removed in HBase 3.0 */ @Deprecated - public ServerCall getCall() { - return (ServerCall) call; + public ServerCall getCall() { + return (ServerCall) call; } public void setStatus(MonitoredRPCHandler status) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index c18b894a3f7..4a4ddbac96d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.ipc; import io.netty.bootstrap.ServerBootstrap; @@ -46,10 +45,7 @@ import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; import java.io.InterruptedIOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -57,31 +53,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; -import org.apache.hadoop.hbase.security.AccessDeniedException; -import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; -import org.apache.hadoop.hbase.security.SaslStatus; -import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; -import org.apache.htrace.TraceInfo; /** * An RPC server with Netty4 implementation. - * */ +@InterfaceAudience.Private public class NettyRpcServer extends RpcServer { public static final Log LOG = LogFactory.getLog(NettyRpcServer.class); @@ -187,166 +173,6 @@ public class NettyRpcServer extends RpcServer { return ((InetSocketAddress) serverChannel.localAddress()); } - public class NettyConnection extends RpcServer.Connection { - - protected Channel channel; - - NettyConnection(Channel channel) { - super(); - this.channel = channel; - InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); - this.addr = inetSocketAddress.getAddress(); - if (addr == null) { - this.hostAddress = "*Unknown*"; - } else { - this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); - } - this.remotePort = inetSocketAddress.getPort(); - this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, - null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); - this.setConnectionHeaderResponseCall = - new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this, - 0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); - this.authFailedCall = - new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0, - null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); - } - - void readPreamble(ByteBuf buffer) throws IOException { - byte[] rpcHead = - { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() }; - if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) { - doBadPreambleHandling("Expected HEADER=" - + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" - + Bytes.toStringBinary(rpcHead) + " from " + toString()); - return; - } - // Now read the next two bytes, the version and the auth to use. - int version = buffer.readByte(); - byte authbyte = buffer.readByte(); - this.authMethod = AuthMethod.valueOf(authbyte); - if (version != CURRENT_VERSION) { - String msg = getFatalConnectionString(version, authbyte); - doBadPreambleHandling(msg, new WrongVersionException(msg)); - return; - } - if (authMethod == null) { - String msg = getFatalConnectionString(version, authbyte); - doBadPreambleHandling(msg, new BadAuthException(msg)); - return; - } - if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { - if (allowFallbackToSimpleAuth) { - metrics.authenticationFallback(); - authenticatedWithFallback = true; - } else { - AccessDeniedException ae = new AccessDeniedException( - "Authentication is required"); - setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); - ((NettyServerCall) authFailedCall) - .sendResponseIfReady(ChannelFutureListener.CLOSE); - return; - } - } - if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { - doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, - null); - authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } - if (authMethod != AuthMethod.SIMPLE) { - useSasl = true; - } - connectionPreambleRead = true; - } - - private void doBadPreambleHandling(final String msg) throws IOException { - doBadPreambleHandling(msg, new FatalConnectionException(msg)); - } - - private void doBadPreambleHandling(final String msg, final Exception e) throws IOException { - LOG.warn(msg); - NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, - null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); - setupResponse(null, fakeCall, e, msg); - // closes out the connection. - fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE); - } - - void process(final ByteBuf buf) throws IOException, InterruptedException { - if (connectionHeaderRead) { - this.callCleanup = new RpcServer.CallCleanup() { - @Override - public void run() { - buf.release(); - } - }; - process(new SingleByteBuff(buf.nioBuffer())); - } else { - byte[] data = new byte[buf.readableBytes()]; - buf.readBytes(data, 0, data.length); - ByteBuffer connectionHeader = ByteBuffer.wrap(data); - buf.release(); - process(connectionHeader); - } - } - - void process(ByteBuffer buf) throws IOException, InterruptedException { - process(new SingleByteBuff(buf)); - } - - void process(ByteBuff buf) throws IOException, InterruptedException { - try { - if (skipInitialSaslHandshake) { - skipInitialSaslHandshake = false; - if (callCleanup != null) { - callCleanup.run(); - } - return; - } - - if (useSasl) { - saslReadAndProcess(buf); - } else { - processOneRpc(buf); - } - } catch (Exception e) { - if (callCleanup != null) { - callCleanup.run(); - } - throw e; - } finally { - this.callCleanup = null; - } - } - - @Override - public synchronized void close() { - disposeSasl(); - channel.close(); - callCleanup = null; - } - - @Override - public boolean isConnectionOpen() { - return channel.isOpen(); - } - - @Override - public ServerCall createCall(int id, final BlockingService service, - final MethodDescriptor md, RequestHeader header, Message param, - CellScanner cellScanner, RpcServer.Connection connection, long size, - TraceInfo tinfo, final InetAddress remoteAddress, int timeout, - CallCleanup reqCleanup) { - return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size, - tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder, - reqCleanup); - } - } - private class Initializer extends ChannelInitializer { final int maxRequestSize; @@ -368,7 +194,7 @@ public class NettyRpcServer extends RpcServer { } private class ConnectionHeaderHandler extends ByteToMessageDecoder { - private NettyConnection connection; + private NettyServerRpcConnection connection; ConnectionHeaderHandler() { } @@ -379,7 +205,7 @@ public class NettyRpcServer extends RpcServer { if (byteBuf.readableBytes() < 6) { return; } - connection = new NettyConnection(ctx.channel()); + connection = new NettyServerRpcConnection(NettyRpcServer.this, ctx.channel()); connection.readPreamble(byteBuf); ((MessageDecoder) ctx.pipeline().get("decoder")) .setConnection(connection); @@ -390,9 +216,9 @@ public class NettyRpcServer extends RpcServer { private class MessageDecoder extends ChannelInboundHandlerAdapter { - private NettyConnection connection; + private NettyServerRpcConnection connection; - void setConnection(NettyConnection connection) { + void setConnection(NettyServerRpcConnection connection) { this.connection = connection; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java index a3f23dd9d7d..3cb9a5a7acc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java @@ -25,7 +25,6 @@ import java.net.InetAddress; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteBufferPool; -import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; @@ -38,30 +37,26 @@ import org.apache.htrace.TraceInfo; * result. */ @InterfaceAudience.Private -class NettyServerCall extends ServerCall { +class NettyServerCall extends ServerCall { NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, - Message param, CellScanner cellScanner, RpcServer.Connection connection, long size, + Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size, TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress, receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); } - NettyConnection getConnection() { - return (NettyConnection) this.connection; - } - /** * If we have a response, and delay is not set, then respond immediately. Otherwise, do not * respond to client. This is called by the RPC code in the context of the Handler thread. */ @Override public synchronized void sendResponseIfReady() throws IOException { - getConnection().channel.writeAndFlush(this); + connection.channel.writeAndFlush(this); } public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException { - getConnection().channel.writeAndFlush(this).addListener(listener); + connection.channel.writeAndFlush(this).addListener(listener); } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java new file mode 100644 index 00000000000..7985295ebc0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.SaslStatus; +import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IntWritable; +import org.apache.htrace.TraceInfo; + +/** + * RpcConnection implementation for netty rpc server. + */ +@InterfaceAudience.Private +class NettyServerRpcConnection extends ServerRpcConnection { + + final Channel channel; + + NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { + super(rpcServer); + this.channel = channel; + InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress()); + this.addr = inetSocketAddress.getAddress(); + if (addr == null) { + this.hostAddress = "*Unknown*"; + } else { + this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); + } + this.remotePort = inetSocketAddress.getPort(); + this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, + null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null); + this.setConnectionHeaderResponseCall = new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, + null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0, + rpcServer.reservoir, rpcServer.cellBlockBuilder, null); + this.authFailedCall = new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, + null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir, + rpcServer.cellBlockBuilder, null); + } + + void readPreamble(ByteBuf buffer) throws IOException { + byte[] rpcHead = { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() }; + if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) { + doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + + " but received HEADER=" + Bytes.toStringBinary(rpcHead) + " from " + toString()); + return; + } + // Now read the next two bytes, the version and the auth to use. + int version = buffer.readByte(); + byte authbyte = buffer.readByte(); + this.authMethod = AuthMethod.valueOf(authbyte); + if (version != NettyRpcServer.CURRENT_VERSION) { + String msg = getFatalConnectionString(version, authbyte); + doBadPreambleHandling(msg, new WrongVersionException(msg)); + return; + } + if (authMethod == null) { + String msg = getFatalConnectionString(version, authbyte); + doBadPreambleHandling(msg, new BadAuthException(msg)); + return; + } + if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { + if (this.rpcServer.allowFallbackToSimpleAuth) { + this.rpcServer.metrics.authenticationFallback(); + authenticatedWithFallback = true; + } else { + AccessDeniedException ae = new AccessDeniedException("Authentication is required"); + this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); + ((NettyServerCall) authFailedCall).sendResponseIfReady(ChannelFutureListener.CLOSE); + return; + } + } + if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, + null); + authMethod = AuthMethod.SIMPLE; + // client has already sent the initial Sasl message and we + // should ignore it. Both client and server should fall back + // to simple auth from now on. + skipInitialSaslHandshake = true; + } + if (authMethod != AuthMethod.SIMPLE) { + useSasl = true; + } + connectionPreambleRead = true; + } + + private void doBadPreambleHandling(final String msg) throws IOException { + doBadPreambleHandling(msg, new FatalConnectionException(msg)); + } + + private void doBadPreambleHandling(final String msg, final Exception e) throws IOException { + NettyRpcServer.LOG.warn(msg); + NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, null, + null, System.currentTimeMillis(), 0, this.rpcServer.reservoir, + this.rpcServer.cellBlockBuilder, null); + this.rpcServer.setupResponse(null, fakeCall, e, msg); + // closes out the connection. + fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE); + } + + void process(final ByteBuf buf) throws IOException, InterruptedException { + if (connectionHeaderRead) { + this.callCleanup = new RpcServer.CallCleanup() { + @Override + public void run() { + buf.release(); + } + }; + process(new SingleByteBuff(buf.nioBuffer())); + } else { + byte[] data = new byte[buf.readableBytes()]; + buf.readBytes(data, 0, data.length); + ByteBuffer connectionHeader = ByteBuffer.wrap(data); + buf.release(); + process(connectionHeader); + } + } + + void process(ByteBuffer buf) throws IOException, InterruptedException { + process(new SingleByteBuff(buf)); + } + + void process(ByteBuff buf) throws IOException, InterruptedException { + try { + if (skipInitialSaslHandshake) { + skipInitialSaslHandshake = false; + if (callCleanup != null) { + callCleanup.run(); + } + return; + } + + if (useSasl) { + saslReadAndProcess(buf); + } else { + processOneRpc(buf); + } + } catch (Exception e) { + if (callCleanup != null) { + callCleanup.run(); + } + throw e; + } finally { + this.callCleanup = null; + } + } + + @Override + public synchronized void close() { + disposeSasl(); + channel.close(); + callCleanup = null; + } + + @Override + public boolean isConnectionOpen() { + return channel.isOpen(); + } + + @Override + public NettyServerCall createCall(int id, final BlockingService service, + final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, + long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout, + CallCleanup reqCleanup) { + return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, tinfo, + remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, + this.rpcServer.cellBlockBuilder, reqCleanup); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index bbc329c42ee..d68a05e1239 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -20,34 +20,22 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; -import java.io.ByteArrayInputStream; +import com.google.common.annotations.VisibleForTesting; + import java.io.ByteArrayOutputStream; -import java.io.Closeable; -import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; -import java.security.GeneralSecurityException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.atomic.LongAdder; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.commons.crypto.cipher.CryptoCipherFactory; -import org.apache.commons.crypto.random.CryptoRandom; -import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -59,65 +47,36 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.VersionInfoUtil; -import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; -import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; -import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.security.AccessDeniedException; -import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; -import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; -import org.apache.hadoop.hbase.security.SaslStatus; -import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.htrace.TraceInfo; import org.codehaus.jackson.map.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; - /** * An RPC server that hosts protobuf described Services. * @@ -262,739 +221,6 @@ public abstract class RpcServer implements RpcServerInterface, void run(); } - /** Reads calls from a connection and queues them for handling. */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value="VO_VOLATILE_INCREMENT", - justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") - public abstract class Connection implements Closeable { - // If initial preamble with version and magic has been read or not. - protected boolean connectionPreambleRead = false; - // If the connection header has been read or not. - protected boolean connectionHeaderRead = false; - - protected CallCleanup callCleanup; - - // Cache the remote host & port info so that even if the socket is - // disconnected, we can say where it used to connect to. - protected String hostAddress; - protected int remotePort; - protected InetAddress addr; - protected ConnectionHeader connectionHeader; - - /** - * Codec the client asked use. - */ - protected Codec codec; - /** - * Compression codec the client asked us use. - */ - protected CompressionCodec compressionCodec; - protected BlockingService service; - - protected AuthMethod authMethod; - protected boolean saslContextEstablished; - protected boolean skipInitialSaslHandshake; - private ByteBuffer unwrappedData; - // When is this set? FindBugs wants to know! Says NP - private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); - protected boolean useSasl; - protected SaslServer saslServer; - protected CryptoAES cryptoAES; - protected boolean useWrap = false; - protected boolean useCryptoAesWrap = false; - // Fake 'call' for failed authorization response - protected static final int AUTHORIZATION_FAILED_CALLID = -1; - protected ServerCall authFailedCall; - protected ByteArrayOutputStream authFailedResponse = - new ByteArrayOutputStream(); - // Fake 'call' for SASL context setup - protected static final int SASL_CALLID = -33; - protected ServerCall saslCall; - // Fake 'call' for connection header response - protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; - protected ServerCall setConnectionHeaderResponseCall; - - // was authentication allowed with a fallback to simple auth - protected boolean authenticatedWithFallback; - - protected boolean retryImmediatelySupported = false; - - public UserGroupInformation attemptingUser = null; // user name before auth - protected User user = null; - protected UserGroupInformation ugi = null; - - public Connection() { - this.callCleanup = null; - } - - @Override - public String toString() { - return getHostAddress() + ":" + remotePort; - } - - public String getHostAddress() { - return hostAddress; - } - - public InetAddress getHostInetAddress() { - return addr; - } - - public int getRemotePort() { - return remotePort; - } - - public VersionInfo getVersionInfo() { - if (connectionHeader.hasVersionInfo()) { - return connectionHeader.getVersionInfo(); - } - return null; - } - - protected String getFatalConnectionString(final int version, final byte authByte) { - return "serverVersion=" + CURRENT_VERSION + - ", clientVersion=" + version + ", authMethod=" + authByte + - ", authSupported=" + (authMethod != null) + " from " + toString(); - } - - protected UserGroupInformation getAuthorizedUgi(String authorizedId) - throws IOException { - UserGroupInformation authorizedUgi; - if (authMethod == AuthMethod.DIGEST) { - TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId, - secretManager); - authorizedUgi = tokenId.getUser(); - if (authorizedUgi == null) { - throw new AccessDeniedException( - "Can't retrieve username from tokenIdentifier."); - } - authorizedUgi.addTokenIdentifier(tokenId); - } else { - authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId); - } - authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod()); - return authorizedUgi; - } - - /** - * Set up cell block codecs - * @throws FatalConnectionException - */ - protected void setupCellBlockCodecs(final ConnectionHeader header) - throws FatalConnectionException { - // TODO: Plug in other supported decoders. - if (!header.hasCellBlockCodecClass()) return; - String className = header.getCellBlockCodecClass(); - if (className == null || className.length() == 0) return; - try { - this.codec = (Codec)Class.forName(className).newInstance(); - } catch (Exception e) { - throw new UnsupportedCellCodecException(className, e); - } - if (!header.hasCellBlockCompressorClass()) return; - className = header.getCellBlockCompressorClass(); - try { - this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance(); - } catch (Exception e) { - throw new UnsupportedCompressionCodecException(className, e); - } - } - - /** - * Set up cipher for rpc encryption with Apache Commons Crypto - * - * @throws FatalConnectionException - */ - protected void setupCryptoCipher(final ConnectionHeader header, - RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) - throws FatalConnectionException { - // If simple auth, return - if (saslServer == null) return; - // check if rpc encryption with Crypto AES - String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); - boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY - .getSaslQop().equalsIgnoreCase(qop); - boolean isCryptoAesEncryption = isEncryption && conf.getBoolean( - "hbase.rpc.crypto.encryption.aes.enabled", false); - if (!isCryptoAesEncryption) return; - if (!header.hasRpcCryptoCipherTransformation()) return; - String transformation = header.getRpcCryptoCipherTransformation(); - if (transformation == null || transformation.length() == 0) return; - // Negotiates AES based on complete saslServer. - // The Crypto metadata need to be encrypted and send to client. - Properties properties = new Properties(); - // the property for SecureRandomFactory - properties.setProperty(CryptoRandomFactory.CLASSES_KEY, - conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", - "org.apache.commons.crypto.random.JavaCryptoRandom")); - // the property for cipher class - properties.setProperty(CryptoCipherFactory.CLASSES_KEY, - conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", - "org.apache.commons.crypto.cipher.JceCipher")); - - int cipherKeyBits = conf.getInt( - "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); - // generate key and iv - if (cipherKeyBits % 8 != 0) { - throw new IllegalArgumentException("The AES cipher key size in bits" + - " should be a multiple of byte"); - } - int len = cipherKeyBits / 8; - byte[] inKey = new byte[len]; - byte[] outKey = new byte[len]; - byte[] inIv = new byte[len]; - byte[] outIv = new byte[len]; - - try { - // generate the cipher meta data with SecureRandom - CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); - secureRandom.nextBytes(inKey); - secureRandom.nextBytes(outKey); - secureRandom.nextBytes(inIv); - secureRandom.nextBytes(outIv); - - // create CryptoAES for server - cryptoAES = new CryptoAES(transformation, properties, - inKey, outKey, inIv, outIv); - // create SaslCipherMeta and send to client, - // for client, the [inKey, outKey], [inIv, outIv] should be reversed - RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); - ccmBuilder.setTransformation(transformation); - ccmBuilder.setInIv(getByteString(outIv)); - ccmBuilder.setInKey(getByteString(outKey)); - ccmBuilder.setOutIv(getByteString(inIv)); - ccmBuilder.setOutKey(getByteString(inKey)); - chrBuilder.setCryptoCipherMeta(ccmBuilder); - useCryptoAesWrap = true; - } catch (GeneralSecurityException | IOException ex) { - throw new UnsupportedCryptoException(ex.getMessage(), ex); - } - } - - private ByteString getByteString(byte[] bytes) { - // return singleton to reduce object allocation - return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); - } - - protected UserGroupInformation createUser(ConnectionHeader head) { - UserGroupInformation ugi = null; - - if (!head.hasUserInfo()) { - return null; - } - UserInformation userInfoProto = head.getUserInfo(); - String effectiveUser = null; - if (userInfoProto.hasEffectiveUser()) { - effectiveUser = userInfoProto.getEffectiveUser(); - } - String realUser = null; - if (userInfoProto.hasRealUser()) { - realUser = userInfoProto.getRealUser(); - } - if (effectiveUser != null) { - if (realUser != null) { - UserGroupInformation realUserUgi = - UserGroupInformation.createRemoteUser(realUser); - ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); - } else { - ugi = UserGroupInformation.createRemoteUser(effectiveUser); - } - } - return ugi; - } - - protected void disposeSasl() { - if (saslServer != null) { - try { - saslServer.dispose(); - saslServer = null; - } catch (SaslException ignored) { - // Ignored. This is being disposed of anyway. - } - } - } - - /** - * No protobuf encoding of raw sasl messages - */ - protected void doRawSaslReply(SaslStatus status, Writable rv, - String errorClass, String error) throws IOException { - ByteBufferOutputStream saslResponse = null; - DataOutputStream out = null; - try { - // In my testing, have noticed that sasl messages are usually - // in the ballpark of 100-200. That's why the initial capacity is 256. - saslResponse = new ByteBufferOutputStream(256); - out = new DataOutputStream(saslResponse); - out.writeInt(status.state); // write status - if (status == SaslStatus.SUCCESS) { - rv.write(out); - } else { - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); - } - saslCall.setSaslTokenResponse(saslResponse.getByteBuffer()); - saslCall.sendResponseIfReady(); - } finally { - if (saslResponse != null) { - saslResponse.close(); - } - if (out != null) { - out.close(); - } - } - } - - public void saslReadAndProcess(ByteBuff saslToken) throws IOException, - InterruptedException { - if (saslContextEstablished) { - if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.limit() - + " for processing by saslServer.unwrap()"); - - if (!useWrap) { - processOneRpc(saslToken); - } else { - byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); - byte [] plaintextData; - if (useCryptoAesWrap) { - // unwrap with CryptoAES - plaintextData = cryptoAES.unwrap(b, 0, b.length); - } else { - plaintextData = saslServer.unwrap(b, 0, b.length); - } - processUnwrappedData(plaintextData); - } - } else { - byte[] replyToken; - try { - if (saslServer == null) { - switch (authMethod) { - case DIGEST: - if (secretManager == null) { - throw new AccessDeniedException( - "Server is not configured to do DIGEST authentication."); - } - saslServer = Sasl.createSaslServer(AuthMethod.DIGEST - .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM, - HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler( - secretManager, this)); - break; - default: - UserGroupInformation current = UserGroupInformation.getCurrentUser(); - String fullName = current.getUserName(); - if (LOG.isDebugEnabled()) { - LOG.debug("Kerberos principal name is " + fullName); - } - final String names[] = SaslUtil.splitKerberosName(fullName); - if (names.length != 3) { - throw new AccessDeniedException( - "Kerberos principal name does NOT have the expected " - + "hostname part: " + fullName); - } - current.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws SaslException { - saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS - .getMechanismName(), names[0], names[1], - HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler()); - return null; - } - }); - } - if (saslServer == null) - throw new AccessDeniedException( - "Unable to find SASL server implementation for " - + authMethod.getMechanismName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName()); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.limit() - + " for processing by saslServer.evaluateResponse()"); - } - replyToken = saslServer - .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes()); - } catch (IOException e) { - IOException sendToClient = e; - Throwable cause = e; - while (cause != null) { - if (cause instanceof InvalidToken) { - sendToClient = (InvalidToken) cause; - break; - } - cause = cause.getCause(); - } - doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), - sendToClient.getLocalizedMessage()); - metrics.authenticationFailure(); - String clientIP = this.toString(); - // attempting user could be null - AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser); - throw e; - } - if (replyToken != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Will send token of size " + replyToken.length - + " from saslServer."); - } - doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, - null); - } - if (saslServer.isComplete()) { - String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); - useWrap = qop != null && !"auth".equalsIgnoreCase(qop); - ugi = getAuthorizedUgi(saslServer.getAuthorizationID()); - if (LOG.isDebugEnabled()) { - LOG.debug("SASL server context established. Authenticated client: " - + ugi + ". Negotiated QoP is " - + saslServer.getNegotiatedProperty(Sasl.QOP)); - } - metrics.authenticationSuccess(); - AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi); - saslContextEstablished = true; - } - } - } - - private void processUnwrappedData(byte[] inBuf) throws IOException, - InterruptedException { - ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); - // Read all RPCs contained in the inBuf, even partial ones - while (true) { - int count; - if (unwrappedDataLengthBuffer.remaining() > 0) { - count = channelRead(ch, unwrappedDataLengthBuffer); - if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) - return; - } - - if (unwrappedData == null) { - unwrappedDataLengthBuffer.flip(); - int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); - - if (unwrappedDataLength == RpcClient.PING_CALL_ID) { - if (LOG.isDebugEnabled()) - LOG.debug("Received ping message"); - unwrappedDataLengthBuffer.clear(); - continue; // ping message - } - unwrappedData = ByteBuffer.allocate(unwrappedDataLength); - } - - count = channelRead(ch, unwrappedData); - if (count <= 0 || unwrappedData.remaining() > 0) - return; - - if (unwrappedData.remaining() == 0) { - unwrappedDataLengthBuffer.clear(); - unwrappedData.flip(); - processOneRpc(new SingleByteBuff(unwrappedData)); - unwrappedData = null; - } - } - } - - public void processOneRpc(ByteBuff buf) throws IOException, - InterruptedException { - if (connectionHeaderRead) { - processRequest(buf); - } else { - processConnectionHeader(buf); - this.connectionHeaderRead = true; - if (!authorizeConnection()) { - // Throw FatalConnectionException wrapping ACE so client does right thing and closes - // down the connection instead of trying to read non-existent retun. - throw new AccessDeniedException("Connection from " + this + " for service " + - connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); - } - this.user = userProvider.create(this.ugi); - } - } - - protected boolean authorizeConnection() throws IOException { - try { - // If auth method is DIGEST, the token was obtained by the - // real user for the effective user, therefore not required to - // authorize real user. doAs is allowed only for simple or kerberos - // authentication - if (ugi != null && ugi.getRealUser() != null - && (authMethod != AuthMethod.DIGEST)) { - ProxyUsers.authorize(ugi, this.getHostAddress(), conf); - } - authorize(ugi, connectionHeader, getHostInetAddress()); - metrics.authorizationSuccess(); - } catch (AuthorizationException ae) { - if (LOG.isDebugEnabled()) { - LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); - } - metrics.authorizationFailure(); - setupResponse(authFailedResponse, authFailedCall, - new AccessDeniedException(ae), ae.getMessage()); - authFailedCall.sendResponseIfReady(); - return false; - } - return true; - } - - // Reads the connection header following version - protected void processConnectionHeader(ByteBuff buf) throws IOException { - if (buf.hasArray()) { - this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); - } else { - CodedInputStream cis = UnsafeByteOperations - .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); - cis.enableAliasing(true); - this.connectionHeader = ConnectionHeader.parseFrom(cis); - } - String serviceName = connectionHeader.getServiceName(); - if (serviceName == null) throw new EmptyServiceNameException(); - this.service = getService(services, serviceName); - if (this.service == null) throw new UnknownServiceException(serviceName); - setupCellBlockCodecs(this.connectionHeader); - RPCProtos.ConnectionHeaderResponse.Builder chrBuilder = - RPCProtos.ConnectionHeaderResponse.newBuilder(); - setupCryptoCipher(this.connectionHeader, chrBuilder); - responseConnectionHeader(chrBuilder); - UserGroupInformation protocolUser = createUser(connectionHeader); - if (!useSasl) { - ugi = protocolUser; - if (ugi != null) { - ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod); - } - // audit logging for SASL authenticated users happens in saslReadAndProcess() - if (authenticatedWithFallback) { - LOG.warn("Allowed fallback to SIMPLE auth for " + ugi - + " connecting from " + getHostAddress()); - } - AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi); - } else { - // user is authenticated - ugi.setAuthenticationMethod(authMethod.authenticationMethod); - //Now we check if this is a proxy user case. If the protocol user is - //different from the 'user', it is a proxy user scenario. However, - //this is not allowed if user authenticated with DIGEST. - if ((protocolUser != null) - && (!protocolUser.getUserName().equals(ugi.getUserName()))) { - if (authMethod == AuthMethod.DIGEST) { - // Not allowed to doAs if token authentication is used - throw new AccessDeniedException("Authenticated user (" + ugi - + ") doesn't match what the client claims to be (" - + protocolUser + ")"); - } else { - // Effective user can be different from authenticated user - // for simple auth or kerberos auth - // The user is the real user. Now we create a proxy user - UserGroupInformation realUser = ugi; - ugi = UserGroupInformation.createProxyUser(protocolUser - .getUserName(), realUser); - // Now the user is a proxy user, set Authentication method Proxy. - ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); - } - } - } - if (connectionHeader.hasVersionInfo()) { - // see if this connection will support RetryImmediatelyException - retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); - - AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort - + " with version info: " - + TextFormat.shortDebugString(connectionHeader.getVersionInfo())); - } else { - AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort - + " with unknown version info"); - } - } - - private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) - throws FatalConnectionException { - // Response the connection header if Crypto AES is enabled - if (!chrBuilder.hasCryptoCipherMeta()) return; - try { - byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray(); - // encrypt the Crypto AES cipher meta data with sasl server, and send to client - byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4]; - Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4); - Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length); - - doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length)); - } catch (IOException ex) { - throw new UnsupportedCryptoException(ex.getMessage(), ex); - } - } - - /** - * Send the response for connection header - */ - private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData) - throws IOException { - ByteBufferOutputStream response = null; - DataOutputStream out = null; - try { - response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4); - out = new DataOutputStream(response); - out.writeInt(wrappedCipherMetaData.length); - out.write(wrappedCipherMetaData); - - setConnectionHeaderResponseCall.setConnectionHeaderResponse(response - .getByteBuffer()); - setConnectionHeaderResponseCall.sendResponseIfReady(); - } finally { - if (out != null) { - out.close(); - } - if (response != null) { - response.close(); - } - } - } - - /** - * @param buf - * Has the request header and the request param and optionally - * encoded data buffer all in this one array. - * @throws IOException - * @throws InterruptedException - */ - protected void processRequest(ByteBuff buf) throws IOException, - InterruptedException { - long totalRequestSize = buf.limit(); - int offset = 0; - // Here we read in the header. We avoid having pb - // do its default 4k allocation for CodedInputStream. We force it to use - // backing array. - CodedInputStream cis; - if (buf.hasArray()) { - cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()) - .newCodedInput(); - } else { - cis = UnsafeByteOperations.unsafeWrap( - new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()) - .newCodedInput(); - } - cis.enableAliasing(true); - int headerSize = cis.readRawVarint32(); - offset = cis.getTotalBytesRead(); - Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, cis, headerSize); - RequestHeader header = (RequestHeader) builder.build(); - offset += headerSize; - int id = header.getCallId(); - if (LOG.isTraceEnabled()) { - LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) - + " totalRequestSize: " + totalRequestSize + " bytes"); - } - // Enforcing the call queue size, this triggers a retry in the client - // This is a bit late to be doing this check - we have already read in the - // total request. - if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) { - final ServerCall callTooBig = createCall(id, this.service, null, - null, null, null, this, totalRequestSize, null, null, 0, - this.callCleanup); - ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); - setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + server.getServerName() - + ", is hbase.ipc.server.max.callqueue.size too small?"); - callTooBig.sendResponseIfReady(); - return; - } - MethodDescriptor md = null; - Message param = null; - CellScanner cellScanner = null; - try { - if (header.hasRequestParam() && header.getRequestParam()) { - md = this.service.getDescriptorForType().findMethodByName( - header.getMethodName()); - if (md == null) - throw new UnsupportedOperationException(header.getMethodName()); - builder = this.service.getRequestPrototype(md).newBuilderForType(); - cis.resetSizeCounter(); - int paramSize = cis.readRawVarint32(); - offset += cis.getTotalBytesRead(); - if (builder != null) { - ProtobufUtil.mergeFrom(builder, cis, paramSize); - param = builder.build(); - } - offset += paramSize; - } else { - // currently header must have request param, so we directly throw - // exception here - String msg = "Invalid request header: " - + TextFormat.shortDebugString(header) - + ", should have param set in it"; - LOG.warn(msg); - throw new DoNotRetryIOException(msg); - } - if (header.hasCellBlockMeta()) { - buf.position(offset); - ByteBuff dup = buf.duplicate(); - dup.limit(offset + header.getCellBlockMeta().getLength()); - cellScanner = cellBlockBuilder.createCellScannerReusingBuffers( - this.codec, this.compressionCodec, dup); - } - } catch (Throwable t) { - InetSocketAddress address = getListenerAddress(); - String msg = (address != null ? address : "(channel closed)") - + " is unable to read call parameter from client " - + getHostAddress(); - LOG.warn(msg, t); - - metrics.exception(t); - - // probably the hbase hadoop version does not match the running hadoop - // version - if (t instanceof LinkageError) { - t = new DoNotRetryIOException(t); - } - // If the method is not present on the server, do not retry. - if (t instanceof UnsupportedOperationException) { - t = new DoNotRetryIOException(t); - } - - final ServerCall readParamsFailedCall = createCall(id, - this.service, null, null, null, null, this, totalRequestSize, null, - null, 0, this.callCleanup); - ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - setupResponse(responseBuffer, readParamsFailedCall, t, - msg + "; " + t.getMessage()); - readParamsFailedCall.sendResponseIfReady(); - return; - } - - TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header - .getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) - : null; - int timeout = 0; - if (header.hasTimeout() && header.getTimeout() > 0) { - timeout = Math.max(minClientRequestTimeout, header.getTimeout()); - } - ServerCall call = createCall(id, this.service, md, header, param, - cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout, - this.callCleanup); - - if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { - callQueueSizeInBytes.add(-1 * call.getSize()); - - ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); - setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + server.getServerName() - + ", too many items queued ?"); - call.sendResponseIfReady(); - } - } - - public abstract boolean isConnectionOpen(); - - public abstract ServerCall createCall(int id, final BlockingService service, - final MethodDescriptor md, RequestHeader header, Message param, - CellScanner cellScanner, Connection connection, long size, - TraceInfo tinfo, final InetAddress remoteAddress, int timeout, - CallCleanup reqCleanup); - } - /** * Datastructure for passing a {@link BlockingService} and its associated class of * protobuf service interface. For example, a server that fielded what is defined @@ -1122,7 +348,7 @@ public abstract class RpcServer implements RpcServerInterface, * @param error error message, if the call failed * @throws IOException */ - protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t, + protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t, String error) throws IOException { if (response != null) response.reset(); call.setResponse(null, null, t, error); @@ -1574,44 +800,4 @@ public abstract class RpcServer implements RpcServerInterface, public void setRsRpcServices(RSRpcServices rsRpcServices) { this.rsRpcServices = rsRpcServices; } - - protected static class ByteBuffByteInput extends ByteInput { - - private ByteBuff buf; - private int offset; - private int length; - - ByteBuffByteInput(ByteBuff buf, int offset, int length) { - this.buf = buf; - this.offset = offset; - this.length = length; - } - - @Override - public byte read(int offset) { - return this.buf.get(getAbsoluteOffset(offset)); - } - - private int getAbsoluteOffset(int offset) { - return this.offset + offset; - } - - @Override - public int read(int offset, byte[] out, int outOffset, int len) { - this.buf.get(getAbsoluteOffset(offset), out, outOffset, len); - return len; - } - - @Override - public int read(int offset, ByteBuffer out) { - int len = out.remaining(); - this.buf.get(out, getAbsoluteOffset(offset), len); - return len; - } - - @Override - public int size() { - return this.length; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 9294839d796..15fe3e6c8eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; -import org.apache.hadoop.hbase.ipc.RpcServer.Connection; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; @@ -52,7 +51,7 @@ import org.apache.htrace.TraceInfo; * the result. */ @InterfaceAudience.Private -abstract class ServerCall implements RpcCall { +abstract class ServerCall implements RpcCall { protected final int id; // the client's call id protected final BlockingService service; @@ -61,7 +60,7 @@ abstract class ServerCall implements RpcCall { protected Message param; // the parameter passed // Optional cell data passed outside of protobufs. protected final CellScanner cellScanner; - protected final Connection connection; // connection to client + protected final T connection; // connection to client protected final long receiveTime; // the time received when response is null // the time served when response is not null protected final int timeout; @@ -96,7 +95,7 @@ abstract class ServerCall implements RpcCall { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", justification="Can't figure why this complaint is happening... see below") ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, - Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo, + Message param, CellScanner cellScanner, T connection, long size, TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { this.id = id; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java new file mode 100644 index 00000000000..d4ab95c242b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -0,0 +1,852 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.security.GeneralSecurityException; +import java.security.PrivilegedExceptionAction; +import java.util.Properties; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.commons.crypto.cipher.CryptoCipherFactory; +import org.apache.commons.crypto.random.CryptoRandom; +import org.apache.commons.crypto.random.CryptoRandomFactory; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.VersionInfoUtil; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; +import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; +import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; +import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; +import org.apache.hadoop.hbase.security.SaslStatus; +import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.htrace.TraceInfo; + +/** Reads calls from a connection and queues them for handling. */ +@edu.umd.cs.findbugs.annotations.SuppressWarnings( + value="VO_VOLATILE_INCREMENT", + justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") +abstract class ServerRpcConnection implements Closeable { + /** */ + protected final RpcServer rpcServer; + // If initial preamble with version and magic has been read or not. + protected boolean connectionPreambleRead = false; + // If the connection header has been read or not. + protected boolean connectionHeaderRead = false; + + protected CallCleanup callCleanup; + + // Cache the remote host & port info so that even if the socket is + // disconnected, we can say where it used to connect to. + protected String hostAddress; + protected int remotePort; + protected InetAddress addr; + protected ConnectionHeader connectionHeader; + + /** + * Codec the client asked use. + */ + protected Codec codec; + /** + * Compression codec the client asked us use. + */ + protected CompressionCodec compressionCodec; + protected BlockingService service; + + protected AuthMethod authMethod; + protected boolean saslContextEstablished; + protected boolean skipInitialSaslHandshake; + private ByteBuffer unwrappedData; + // When is this set? FindBugs wants to know! Says NP + private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); + protected boolean useSasl; + protected SaslServer saslServer; + protected CryptoAES cryptoAES; + protected boolean useWrap = false; + protected boolean useCryptoAesWrap = false; + // Fake 'call' for failed authorization response + protected static final int AUTHORIZATION_FAILED_CALLID = -1; + protected ServerCall authFailedCall; + protected ByteArrayOutputStream authFailedResponse = + new ByteArrayOutputStream(); + // Fake 'call' for SASL context setup + protected static final int SASL_CALLID = -33; + protected ServerCall saslCall; + // Fake 'call' for connection header response + protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; + protected ServerCall setConnectionHeaderResponseCall; + + // was authentication allowed with a fallback to simple auth + protected boolean authenticatedWithFallback; + + protected boolean retryImmediatelySupported = false; + + private UserGroupInformation attemptingUser = null; // user name before auth + protected User user = null; + protected UserGroupInformation ugi = null; + + public ServerRpcConnection(RpcServer rpcServer) { + this.rpcServer = rpcServer; + this.callCleanup = null; + } + + @Override + public String toString() { + return getHostAddress() + ":" + remotePort; + } + + public String getHostAddress() { + return hostAddress; + } + + public InetAddress getHostInetAddress() { + return addr; + } + + public int getRemotePort() { + return remotePort; + } + + public VersionInfo getVersionInfo() { + if (connectionHeader.hasVersionInfo()) { + return connectionHeader.getVersionInfo(); + } + return null; + } + + protected String getFatalConnectionString(final int version, final byte authByte) { + return "serverVersion=" + RpcServer.CURRENT_VERSION + + ", clientVersion=" + version + ", authMethod=" + authByte + + ", authSupported=" + (authMethod != null) + " from " + toString(); + } + + protected UserGroupInformation getAuthorizedUgi(String authorizedId) + throws IOException { + UserGroupInformation authorizedUgi; + if (authMethod == AuthMethod.DIGEST) { + TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId, + this.rpcServer.secretManager); + authorizedUgi = tokenId.getUser(); + if (authorizedUgi == null) { + throw new AccessDeniedException( + "Can't retrieve username from tokenIdentifier."); + } + authorizedUgi.addTokenIdentifier(tokenId); + } else { + authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId); + } + authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod()); + return authorizedUgi; + } + + /** + * Set up cell block codecs + * @throws FatalConnectionException + */ + protected void setupCellBlockCodecs(final ConnectionHeader header) + throws FatalConnectionException { + // TODO: Plug in other supported decoders. + if (!header.hasCellBlockCodecClass()) return; + String className = header.getCellBlockCodecClass(); + if (className == null || className.length() == 0) return; + try { + this.codec = (Codec)Class.forName(className).newInstance(); + } catch (Exception e) { + throw new UnsupportedCellCodecException(className, e); + } + if (!header.hasCellBlockCompressorClass()) return; + className = header.getCellBlockCompressorClass(); + try { + this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance(); + } catch (Exception e) { + throw new UnsupportedCompressionCodecException(className, e); + } + } + + /** + * Set up cipher for rpc encryption with Apache Commons Crypto + * + * @throws FatalConnectionException + */ + protected void setupCryptoCipher(final ConnectionHeader header, + RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) + throws FatalConnectionException { + // If simple auth, return + if (saslServer == null) return; + // check if rpc encryption with Crypto AES + String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY + .getSaslQop().equalsIgnoreCase(qop); + boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean( + "hbase.rpc.crypto.encryption.aes.enabled", false); + if (!isCryptoAesEncryption) return; + if (!header.hasRpcCryptoCipherTransformation()) return; + String transformation = header.getRpcCryptoCipherTransformation(); + if (transformation == null || transformation.length() == 0) return; + // Negotiates AES based on complete saslServer. + // The Crypto metadata need to be encrypted and send to client. + Properties properties = new Properties(); + // the property for SecureRandomFactory + properties.setProperty(CryptoRandomFactory.CLASSES_KEY, + this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", + "org.apache.commons.crypto.random.JavaCryptoRandom")); + // the property for cipher class + properties.setProperty(CryptoCipherFactory.CLASSES_KEY, + this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", + "org.apache.commons.crypto.cipher.JceCipher")); + + int cipherKeyBits = this.rpcServer.conf.getInt( + "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); + // generate key and iv + if (cipherKeyBits % 8 != 0) { + throw new IllegalArgumentException("The AES cipher key size in bits" + + " should be a multiple of byte"); + } + int len = cipherKeyBits / 8; + byte[] inKey = new byte[len]; + byte[] outKey = new byte[len]; + byte[] inIv = new byte[len]; + byte[] outIv = new byte[len]; + + try { + // generate the cipher meta data with SecureRandom + CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); + secureRandom.nextBytes(inKey); + secureRandom.nextBytes(outKey); + secureRandom.nextBytes(inIv); + secureRandom.nextBytes(outIv); + + // create CryptoAES for server + cryptoAES = new CryptoAES(transformation, properties, + inKey, outKey, inIv, outIv); + // create SaslCipherMeta and send to client, + // for client, the [inKey, outKey], [inIv, outIv] should be reversed + RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); + ccmBuilder.setTransformation(transformation); + ccmBuilder.setInIv(getByteString(outIv)); + ccmBuilder.setInKey(getByteString(outKey)); + ccmBuilder.setOutIv(getByteString(inIv)); + ccmBuilder.setOutKey(getByteString(inKey)); + chrBuilder.setCryptoCipherMeta(ccmBuilder); + useCryptoAesWrap = true; + } catch (GeneralSecurityException | IOException ex) { + throw new UnsupportedCryptoException(ex.getMessage(), ex); + } + } + + private ByteString getByteString(byte[] bytes) { + // return singleton to reduce object allocation + return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); + } + + protected UserGroupInformation createUser(ConnectionHeader head) { + UserGroupInformation ugi = null; + + if (!head.hasUserInfo()) { + return null; + } + UserInformation userInfoProto = head.getUserInfo(); + String effectiveUser = null; + if (userInfoProto.hasEffectiveUser()) { + effectiveUser = userInfoProto.getEffectiveUser(); + } + String realUser = null; + if (userInfoProto.hasRealUser()) { + realUser = userInfoProto.getRealUser(); + } + if (effectiveUser != null) { + if (realUser != null) { + UserGroupInformation realUserUgi = + UserGroupInformation.createRemoteUser(realUser); + ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); + } else { + ugi = UserGroupInformation.createRemoteUser(effectiveUser); + } + } + return ugi; + } + + protected void disposeSasl() { + if (saslServer != null) { + try { + saslServer.dispose(); + saslServer = null; + } catch (SaslException ignored) { + // Ignored. This is being disposed of anyway. + } + } + } + + /** + * No protobuf encoding of raw sasl messages + */ + protected void doRawSaslReply(SaslStatus status, Writable rv, + String errorClass, String error) throws IOException { + ByteBufferOutputStream saslResponse = null; + DataOutputStream out = null; + try { + // In my testing, have noticed that sasl messages are usually + // in the ballpark of 100-200. That's why the initial capacity is 256. + saslResponse = new ByteBufferOutputStream(256); + out = new DataOutputStream(saslResponse); + out.writeInt(status.state); // write status + if (status == SaslStatus.SUCCESS) { + rv.write(out); + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + } + saslCall.setSaslTokenResponse(saslResponse.getByteBuffer()); + saslCall.sendResponseIfReady(); + } finally { + if (saslResponse != null) { + saslResponse.close(); + } + if (out != null) { + out.close(); + } + } + } + + public void saslReadAndProcess(ByteBuff saslToken) throws IOException, + InterruptedException { + if (saslContextEstablished) { + if (RpcServer.LOG.isTraceEnabled()) + RpcServer.LOG.trace("Have read input token of size " + saslToken.limit() + + " for processing by saslServer.unwrap()"); + + if (!useWrap) { + processOneRpc(saslToken); + } else { + byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); + byte [] plaintextData; + if (useCryptoAesWrap) { + // unwrap with CryptoAES + plaintextData = cryptoAES.unwrap(b, 0, b.length); + } else { + plaintextData = saslServer.unwrap(b, 0, b.length); + } + processUnwrappedData(plaintextData); + } + } else { + byte[] replyToken; + try { + if (saslServer == null) { + switch (authMethod) { + case DIGEST: + if (this.rpcServer.secretManager == null) { + throw new AccessDeniedException( + "Server is not configured to do DIGEST authentication."); + } + saslServer = Sasl.createSaslServer(AuthMethod.DIGEST + .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM, + HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler( + this.rpcServer.secretManager, ugi -> attemptingUser = ugi)); + break; + default: + UserGroupInformation current = UserGroupInformation.getCurrentUser(); + String fullName = current.getUserName(); + if (RpcServer.LOG.isDebugEnabled()) { + RpcServer.LOG.debug("Kerberos principal name is " + fullName); + } + final String names[] = SaslUtil.splitKerberosName(fullName); + if (names.length != 3) { + throw new AccessDeniedException( + "Kerberos principal name does NOT have the expected " + + "hostname part: " + fullName); + } + current.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws SaslException { + saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS + .getMechanismName(), names[0], names[1], + HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler()); + return null; + } + }); + } + if (saslServer == null) + throw new AccessDeniedException( + "Unable to find SASL server implementation for " + + authMethod.getMechanismName()); + if (RpcServer.LOG.isDebugEnabled()) { + RpcServer.LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName()); + } + } + if (RpcServer.LOG.isDebugEnabled()) { + RpcServer.LOG.debug("Have read input token of size " + saslToken.limit() + + " for processing by saslServer.evaluateResponse()"); + } + replyToken = saslServer + .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes()); + } catch (IOException e) { + IOException sendToClient = e; + Throwable cause = e; + while (cause != null) { + if (cause instanceof InvalidToken) { + sendToClient = (InvalidToken) cause; + break; + } + cause = cause.getCause(); + } + doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), + sendToClient.getLocalizedMessage()); + this.rpcServer.metrics.authenticationFailure(); + String clientIP = this.toString(); + // attempting user could be null + RpcServer.AUDITLOG.warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + attemptingUser); + throw e; + } + if (replyToken != null) { + if (RpcServer.LOG.isDebugEnabled()) { + RpcServer.LOG.debug("Will send token of size " + replyToken.length + + " from saslServer."); + } + doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, + null); + } + if (saslServer.isComplete()) { + String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + useWrap = qop != null && !"auth".equalsIgnoreCase(qop); + ugi = getAuthorizedUgi(saslServer.getAuthorizationID()); + if (RpcServer.LOG.isDebugEnabled()) { + RpcServer.LOG.debug("SASL server context established. Authenticated client: " + + ugi + ". Negotiated QoP is " + + saslServer.getNegotiatedProperty(Sasl.QOP)); + } + this.rpcServer.metrics.authenticationSuccess(); + RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); + saslContextEstablished = true; + } + } + } + + private void processUnwrappedData(byte[] inBuf) throws IOException, + InterruptedException { + ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); + // Read all RPCs contained in the inBuf, even partial ones + while (true) { + int count; + if (unwrappedDataLengthBuffer.remaining() > 0) { + count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer); + if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) + return; + } + + if (unwrappedData == null) { + unwrappedDataLengthBuffer.flip(); + int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); + + if (unwrappedDataLength == RpcClient.PING_CALL_ID) { + if (RpcServer.LOG.isDebugEnabled()) + RpcServer.LOG.debug("Received ping message"); + unwrappedDataLengthBuffer.clear(); + continue; // ping message + } + unwrappedData = ByteBuffer.allocate(unwrappedDataLength); + } + + count = this.rpcServer.channelRead(ch, unwrappedData); + if (count <= 0 || unwrappedData.remaining() > 0) + return; + + if (unwrappedData.remaining() == 0) { + unwrappedDataLengthBuffer.clear(); + unwrappedData.flip(); + processOneRpc(new SingleByteBuff(unwrappedData)); + unwrappedData = null; + } + } + } + + public void processOneRpc(ByteBuff buf) throws IOException, + InterruptedException { + if (connectionHeaderRead) { + processRequest(buf); + } else { + processConnectionHeader(buf); + this.connectionHeaderRead = true; + if (!authorizeConnection()) { + // Throw FatalConnectionException wrapping ACE so client does right thing and closes + // down the connection instead of trying to read non-existent retun. + throw new AccessDeniedException("Connection from " + this + " for service " + + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); + } + this.user = this.rpcServer.userProvider.create(this.ugi); + } + } + + protected boolean authorizeConnection() throws IOException { + try { + // If auth method is DIGEST, the token was obtained by the + // real user for the effective user, therefore not required to + // authorize real user. doAs is allowed only for simple or kerberos + // authentication + if (ugi != null && ugi.getRealUser() != null + && (authMethod != AuthMethod.DIGEST)) { + ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); + } + this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); + this.rpcServer.metrics.authorizationSuccess(); + } catch (AuthorizationException ae) { + if (RpcServer.LOG.isDebugEnabled()) { + RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); + } + this.rpcServer.metrics.authorizationFailure(); + this.rpcServer.setupResponse(authFailedResponse, authFailedCall, + new AccessDeniedException(ae), ae.getMessage()); + authFailedCall.sendResponseIfReady(); + return false; + } + return true; + } + + // Reads the connection header following version + protected void processConnectionHeader(ByteBuff buf) throws IOException { + if (buf.hasArray()) { + this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); + } else { + CodedInputStream cis = UnsafeByteOperations + .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); + cis.enableAliasing(true); + this.connectionHeader = ConnectionHeader.parseFrom(cis); + } + String serviceName = connectionHeader.getServiceName(); + if (serviceName == null) throw new EmptyServiceNameException(); + this.service = RpcServer.getService(this.rpcServer.services, serviceName); + if (this.service == null) throw new UnknownServiceException(serviceName); + setupCellBlockCodecs(this.connectionHeader); + RPCProtos.ConnectionHeaderResponse.Builder chrBuilder = + RPCProtos.ConnectionHeaderResponse.newBuilder(); + setupCryptoCipher(this.connectionHeader, chrBuilder); + responseConnectionHeader(chrBuilder); + UserGroupInformation protocolUser = createUser(connectionHeader); + if (!useSasl) { + ugi = protocolUser; + if (ugi != null) { + ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod); + } + // audit logging for SASL authenticated users happens in saslReadAndProcess() + if (authenticatedWithFallback) { + RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for " + ugi + + " connecting from " + getHostAddress()); + } + RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); + } else { + // user is authenticated + ugi.setAuthenticationMethod(authMethod.authenticationMethod); + //Now we check if this is a proxy user case. If the protocol user is + //different from the 'user', it is a proxy user scenario. However, + //this is not allowed if user authenticated with DIGEST. + if ((protocolUser != null) + && (!protocolUser.getUserName().equals(ugi.getUserName()))) { + if (authMethod == AuthMethod.DIGEST) { + // Not allowed to doAs if token authentication is used + throw new AccessDeniedException("Authenticated user (" + ugi + + ") doesn't match what the client claims to be (" + + protocolUser + ")"); + } else { + // Effective user can be different from authenticated user + // for simple auth or kerberos auth + // The user is the real user. Now we create a proxy user + UserGroupInformation realUser = ugi; + ugi = UserGroupInformation.createProxyUser(protocolUser + .getUserName(), realUser); + // Now the user is a proxy user, set Authentication method Proxy. + ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); + } + } + } + if (connectionHeader.hasVersionInfo()) { + // see if this connection will support RetryImmediatelyException + retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); + + RpcServer.AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort + + " with version info: " + + TextFormat.shortDebugString(connectionHeader.getVersionInfo())); + } else { + RpcServer.AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort + + " with unknown version info"); + } + } + + private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) + throws FatalConnectionException { + // Response the connection header if Crypto AES is enabled + if (!chrBuilder.hasCryptoCipherMeta()) return; + try { + byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray(); + // encrypt the Crypto AES cipher meta data with sasl server, and send to client + byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4]; + Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4); + Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length); + + doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length)); + } catch (IOException ex) { + throw new UnsupportedCryptoException(ex.getMessage(), ex); + } + } + + /** + * Send the response for connection header + */ + private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData) + throws IOException { + ByteBufferOutputStream response = null; + DataOutputStream out = null; + try { + response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4); + out = new DataOutputStream(response); + out.writeInt(wrappedCipherMetaData.length); + out.write(wrappedCipherMetaData); + + setConnectionHeaderResponseCall.setConnectionHeaderResponse(response + .getByteBuffer()); + setConnectionHeaderResponseCall.sendResponseIfReady(); + } finally { + if (out != null) { + out.close(); + } + if (response != null) { + response.close(); + } + } + } + + /** + * @param buf + * Has the request header and the request param and optionally + * encoded data buffer all in this one array. + * @throws IOException + * @throws InterruptedException + */ + protected void processRequest(ByteBuff buf) throws IOException, + InterruptedException { + long totalRequestSize = buf.limit(); + int offset = 0; + // Here we read in the header. We avoid having pb + // do its default 4k allocation for CodedInputStream. We force it to use + // backing array. + CodedInputStream cis; + if (buf.hasArray()) { + cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput(); + } else { + cis = UnsafeByteOperations + .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); + } + cis.enableAliasing(true); + int headerSize = cis.readRawVarint32(); + offset = cis.getTotalBytesRead(); + Message.Builder builder = RequestHeader.newBuilder(); + ProtobufUtil.mergeFrom(builder, cis, headerSize); + RequestHeader header = (RequestHeader) builder.build(); + offset += headerSize; + int id = header.getCallId(); + if (RpcServer.LOG.isTraceEnabled()) { + RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + + " totalRequestSize: " + totalRequestSize + " bytes"); + } + // Enforcing the call queue size, this triggers a retry in the client + // This is a bit late to be doing this check - we have already read in the + // total request. + if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { + final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, + totalRequestSize, null, null, 0, this.callCleanup); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + this.rpcServer.setupResponse(responseBuffer, callTooBig, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + + ", is hbase.ipc.server.max.callqueue.size too small?"); + callTooBig.sendResponseIfReady(); + return; + } + MethodDescriptor md = null; + Message param = null; + CellScanner cellScanner = null; + try { + if (header.hasRequestParam() && header.getRequestParam()) { + md = this.service.getDescriptorForType().findMethodByName( + header.getMethodName()); + if (md == null) + throw new UnsupportedOperationException(header.getMethodName()); + builder = this.service.getRequestPrototype(md).newBuilderForType(); + cis.resetSizeCounter(); + int paramSize = cis.readRawVarint32(); + offset += cis.getTotalBytesRead(); + if (builder != null) { + ProtobufUtil.mergeFrom(builder, cis, paramSize); + param = builder.build(); + } + offset += paramSize; + } else { + // currently header must have request param, so we directly throw + // exception here + String msg = "Invalid request header: " + + TextFormat.shortDebugString(header) + + ", should have param set in it"; + RpcServer.LOG.warn(msg); + throw new DoNotRetryIOException(msg); + } + if (header.hasCellBlockMeta()) { + buf.position(offset); + ByteBuff dup = buf.duplicate(); + dup.limit(offset + header.getCellBlockMeta().getLength()); + cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers( + this.codec, this.compressionCodec, dup); + } + } catch (Throwable t) { + InetSocketAddress address = this.rpcServer.getListenerAddress(); + String msg = (address != null ? address : "(channel closed)") + + " is unable to read call parameter from client " + + getHostAddress(); + RpcServer.LOG.warn(msg, t); + + this.rpcServer.metrics.exception(t); + + // probably the hbase hadoop version does not match the running hadoop + // version + if (t instanceof LinkageError) { + t = new DoNotRetryIOException(t); + } + // If the method is not present on the server, do not retry. + if (t instanceof UnsupportedOperationException) { + t = new DoNotRetryIOException(t); + } + + final ServerCall readParamsFailedCall = createCall(id, this.service, null, null, null, null, + totalRequestSize, null, null, 0, this.callCleanup); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + this.rpcServer.setupResponse(responseBuffer, readParamsFailedCall, t, + msg + "; " + t.getMessage()); + readParamsFailedCall.sendResponseIfReady(); + return; + } + + TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header + .getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) + : null; + int timeout = 0; + if (header.hasTimeout() && header.getTimeout() > 0) { + timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); + } + ServerCall call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize, + traceInfo, this.addr, timeout, this.callCleanup); + + if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { + this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); + + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + this.rpcServer.setupResponse(responseBuffer, call, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + + ", too many items queued ?"); + call.sendResponseIfReady(); + } + } + + public abstract boolean isConnectionOpen(); + + public abstract ServerCall createCall(int id, BlockingService service, MethodDescriptor md, + RequestHeader header, Message param, CellScanner cellScanner, long size, TraceInfo tinfo, + InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); + + private static class ByteBuffByteInput extends ByteInput { + + private ByteBuff buf; + private int offset; + private int length; + + ByteBuffByteInput(ByteBuff buf, int offset, int length) { + this.buf = buf; + this.offset = offset; + this.length = length; + } + + @Override + public byte read(int offset) { + return this.buf.get(getAbsoluteOffset(offset)); + } + + private int getAbsoluteOffset(int offset) { + return this.offset + offset; + } + + @Override + public int read(int offset, byte[] out, int outOffset, int len) { + this.buf.get(getAbsoluteOffset(offset), out, outOffset, len); + return len; + } + + @Override + public int read(int offset, ByteBuffer out) { + int len = out.remaining(); + this.buf.get(out, getAbsoluteOffset(offset), len); + return len; + } + + @Override + public int size() { + return this.length; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index 59d1ff9398c..481b7012652 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -15,29 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.io.InputStream; import java.net.BindException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; import java.nio.channels.GatheringByteChannel; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -45,47 +36,26 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.VersionInfoUtil; -import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; -import org.apache.hadoop.hbase.security.AccessDeniedException; -import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; -import org.apache.hadoop.hbase.security.SaslStatus; -import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; -import org.apache.hadoop.util.StringUtils; -import org.apache.htrace.TraceInfo; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -125,7 +95,7 @@ public class SimpleRpcServer extends RpcServer { // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; private Listener listener = null; - protected Responder responder = null; + protected SimpleRpcServerResponder responder = null; /** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { @@ -178,7 +148,7 @@ public class SimpleRpcServer extends RpcServer { private class Reader implements Runnable { - final private LinkedBlockingQueue pendingConnections; + final private LinkedBlockingQueue pendingConnections; private final Selector readSelector; Reader() throws IOException { @@ -206,7 +176,7 @@ public class SimpleRpcServer extends RpcServer { // unbridled acceptance of connections that starves the select int size = pendingConnections.size(); for (int i=size; i>0; i--) { - Connection conn = pendingConnections.take(); + SimpleServerRpcConnection conn = pendingConnections.take(); conn.channel.register(readSelector, SelectionKey.OP_READ, conn); } readSelector.select(); @@ -238,7 +208,7 @@ public class SimpleRpcServer extends RpcServer { * so the connection must be queued. The reader will drain the queue * and update its readSelector before performing the next select */ - public void addConnection(Connection conn) throws IOException { + public void addConnection(SimpleServerRpcConnection conn) throws IOException { pendingConnections.add(conn); readSelector.wakeup(); } @@ -314,7 +284,7 @@ public class SimpleRpcServer extends RpcServer { private void closeCurrentConnection(SelectionKey key, Throwable e) { if (key != null) { - Connection c = (Connection)key.attachment(); + SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment(); if (c != null) { closeConnection(c); key.attach(null); @@ -334,7 +304,7 @@ public class SimpleRpcServer extends RpcServer { channel.socket().setTcpNoDelay(tcpNoDelay); channel.socket().setKeepAlive(tcpKeepAlive); Reader reader = getReader(); - Connection c = connectionManager.register(channel); + SimpleServerRpcConnection c = connectionManager.register(channel); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { @@ -349,7 +319,7 @@ public class SimpleRpcServer extends RpcServer { void doRead(SelectionKey key) throws InterruptedException { int count; - Connection c = (Connection) key.attachment(); + SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment(); if (c == null) { return; } @@ -396,649 +366,6 @@ public class SimpleRpcServer extends RpcServer { } } - // Sends responses of RPC back to clients. - protected class Responder extends Thread { - private final Selector writeSelector; - private final Set writingCons = - Collections.newSetFromMap(new ConcurrentHashMap()); - - Responder() throws IOException { - this.setName("RpcServer.responder"); - this.setDaemon(true); - this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); - writeSelector = Selector.open(); // create a selector - } - - @Override - public void run() { - LOG.debug(getName() + ": starting"); - try { - doRunLoop(); - } finally { - LOG.info(getName() + ": stopping"); - try { - writeSelector.close(); - } catch (IOException ioe) { - LOG.error(getName() + ": couldn't close write selector", ioe); - } - } - } - - /** - * Take the list of the connections that want to write, and register them - * in the selector. - */ - private void registerWrites() { - Iterator it = writingCons.iterator(); - while (it.hasNext()) { - Connection c = it.next(); - it.remove(); - SelectionKey sk = c.channel.keyFor(writeSelector); - try { - if (sk == null) { - try { - c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); - } catch (ClosedChannelException e) { - // ignore: the client went away. - if (LOG.isTraceEnabled()) LOG.trace("ignored", e); - } - } else { - sk.interestOps(SelectionKey.OP_WRITE); - } - } catch (CancelledKeyException e) { - // ignore: the client went away. - if (LOG.isTraceEnabled()) LOG.trace("ignored", e); - } - } - } - - /** - * Add a connection to the list that want to write, - */ - public void registerForWrite(Connection c) { - if (writingCons.add(c)) { - writeSelector.wakeup(); - } - } - - private void doRunLoop() { - long lastPurgeTime = 0; // last check for old calls. - while (running) { - try { - registerWrites(); - int keyCt = writeSelector.select(purgeTimeout); - if (keyCt == 0) { - continue; - } - - Set keys = writeSelector.selectedKeys(); - Iterator iter = keys.iterator(); - while (iter.hasNext()) { - SelectionKey key = iter.next(); - iter.remove(); - try { - if (key.isValid() && key.isWritable()) { - doAsyncWrite(key); - } - } catch (IOException e) { - LOG.debug(getName() + ": asyncWrite", e); - } - } - - lastPurgeTime = purge(lastPurgeTime); - - } catch (OutOfMemoryError e) { - if (errorHandler != null) { - if (errorHandler.checkOOME(e)) { - LOG.info(getName() + ": exiting on OutOfMemoryError"); - return; - } - } else { - // - // we can run out of memory if we have too many threads - // log the event and sleep for a minute and give - // some thread(s) a chance to finish - // - LOG.warn(getName() + ": OutOfMemoryError in server select", e); - try { - Thread.sleep(60000); - } catch (InterruptedException ex) { - LOG.debug("Interrupted while sleeping"); - return; - } - } - } catch (Exception e) { - LOG.warn(getName() + ": exception in Responder " + - StringUtils.stringifyException(e), e); - } - } - LOG.info(getName() + ": stopped"); - } - - /** - * If there were some calls that have not been sent out for a - * long time, we close the connection. - * @return the time of the purge. - */ - private long purge(long lastPurgeTime) { - long now = System.currentTimeMillis(); - if (now < lastPurgeTime + purgeTimeout) { - return lastPurgeTime; - } - - ArrayList conWithOldCalls = new ArrayList<>(); - // get the list of channels from list of keys. - synchronized (writeSelector.keys()) { - for (SelectionKey key : writeSelector.keys()) { - Connection connection = (Connection) key.attachment(); - if (connection == null) { - throw new IllegalStateException("Coding error: SelectionKey key without attachment."); - } - SimpleServerCall call = connection.responseQueue.peekFirst(); - if (call != null && now > call.lastSentTime + purgeTimeout) { - conWithOldCalls.add(call.getConnection()); - } - } - } - - // Seems safer to close the connection outside of the synchronized loop... - for (Connection connection : conWithOldCalls) { - closeConnection(connection); - } - - return now; - } - - private void doAsyncWrite(SelectionKey key) throws IOException { - Connection connection = (Connection) key.attachment(); - if (connection == null) { - throw new IOException("doAsyncWrite: no connection"); - } - if (key.channel() != connection.channel) { - throw new IOException("doAsyncWrite: bad channel"); - } - - if (processAllResponses(connection)) { - try { - // We wrote everything, so we don't need to be told when the socket is ready for - // write anymore. - key.interestOps(0); - } catch (CancelledKeyException e) { - /* The Listener/reader might have closed the socket. - * We don't explicitly cancel the key, so not sure if this will - * ever fire. - * This warning could be removed. - */ - LOG.warn("Exception while changing ops : " + e); - } - } - } - - /** - * Process the response for this call. You need to have the lock on - * {@link org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection#responseWriteLock} - * - * @param call the call - * @return true if we proceed the call fully, false otherwise. - * @throws IOException - */ - private boolean processResponse(final SimpleServerCall call) throws IOException { - boolean error = true; - try { - // Send as much data as we can in the non-blocking fashion - long numBytes = channelWrite(call.getConnection().channel, - call.response); - if (numBytes < 0) { - throw new HBaseIOException("Error writing on the socket " + - "for the call:" + call.toShortString()); - } - error = false; - } finally { - if (error) { - LOG.debug(getName() + call.toShortString() + ": output error -- closing"); - // We will be closing this connection itself. Mark this call as done so that all the - // buffer(s) it got from pool can get released - call.done(); - closeConnection(call.getConnection()); - } - } - - if (!call.response.hasRemaining()) { - call.done(); - return true; - } else { - return false; // Socket can't take more, we will have to come back. - } - } - - /** - * Process all the responses for this connection - * - * @return true if all the calls were processed or that someone else is doing it. - * false if there * is still some work to do. In this case, we expect the caller to - * delay us. - * @throws IOException - */ - private boolean processAllResponses(final Connection connection) throws IOException { - // We want only one writer on the channel for a connection at a time. - connection.responseWriteLock.lock(); - try { - for (int i = 0; i < 20; i++) { - // protection if some handlers manage to need all the responder - SimpleServerCall call = connection.responseQueue.pollFirst(); - if (call == null) { - return true; - } - if (!processResponse(call)) { - connection.responseQueue.addFirst(call); - return false; - } - } - } finally { - connection.responseWriteLock.unlock(); - } - - return connection.responseQueue.isEmpty(); - } - - // - // Enqueue a response from the application. - // - void doRespond(SimpleServerCall call) throws IOException { - boolean added = false; - - // If there is already a write in progress, we don't wait. This allows to free the handlers - // immediately for other tasks. - if (call.getConnection().responseQueue.isEmpty() - && call.getConnection().responseWriteLock.tryLock()) { - try { - if (call.getConnection().responseQueue.isEmpty()) { - // If we're alone, we can try to do a direct call to the socket. It's - // an optimisation to save on context switches and data transfer between cores.. - if (processResponse(call)) { - return; // we're done. - } - // Too big to fit, putting ahead. - call.getConnection().responseQueue.addFirst(call); - added = true; // We will register to the selector later, outside of the lock. - } - } finally { - call.getConnection().responseWriteLock.unlock(); - } - } - - if (!added) { - call.getConnection().responseQueue.addLast(call); - } - call.responder.registerForWrite(call.getConnection()); - - // set the serve time when the response has to be sent later - call.lastSentTime = System.currentTimeMillis(); - } - } - - /** Reads calls from a connection and queues them for handling. */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value="VO_VOLATILE_INCREMENT", - justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") - public class Connection extends RpcServer.Connection { - - protected SocketChannel channel; - private ByteBuff data; - private ByteBuffer dataLengthBuffer; - protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque<>(); - private final Lock responseWriteLock = new ReentrantLock(); - private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs - private long lastContact; - protected Socket socket; - - public Connection(SocketChannel channel, long lastContact) { - super(); - this.channel = channel; - this.lastContact = lastContact; - this.data = null; - this.dataLengthBuffer = ByteBuffer.allocate(4); - this.socket = channel.socket(); - this.addr = socket.getInetAddress(); - if (addr == null) { - this.hostAddress = "*Unknown*"; - } else { - this.hostAddress = addr.getHostAddress(); - } - this.remotePort = socket.getPort(); - if (socketSendBufferSize != 0) { - try { - socket.setSendBufferSize(socketSendBufferSize); - } catch (IOException e) { - LOG.warn("Connection: unable to set socket send buffer size to " + - socketSendBufferSize); - } - } - this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, - null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder); - this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID, - null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0, - reservoir, cellBlockBuilder, null, responder); - this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, - null, null, this, 0, null, null, System.currentTimeMillis(), 0, reservoir, - cellBlockBuilder, null, responder); - } - - public void setLastContact(long lastContact) { - this.lastContact = lastContact; - } - - public long getLastContact() { - return lastContact; - } - - /* Return true if the connection has no outstanding rpc */ - private boolean isIdle() { - return rpcCount.sum() == 0; - } - - /* Decrement the outstanding RPC count */ - protected void decRpcCount() { - rpcCount.decrement(); - } - - /* Increment the outstanding RPC count */ - protected void incRpcCount() { - rpcCount.increment(); - } - - private int readPreamble() throws IOException { - int count; - // Check for 'HBas' magic. - this.dataLengthBuffer.flip(); - if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) { - return doBadPreambleHandling("Expected HEADER=" + - Bytes.toStringBinary(HConstants.RPC_HEADER) + - " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) + - " from " + toString()); - } - // Now read the next two bytes, the version and the auth to use. - ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2); - count = channelRead(channel, versionAndAuthBytes); - if (count < 0 || versionAndAuthBytes.remaining() > 0) { - return count; - } - int version = versionAndAuthBytes.get(0); - byte authbyte = versionAndAuthBytes.get(1); - this.authMethod = AuthMethod.valueOf(authbyte); - if (version != CURRENT_VERSION) { - String msg = getFatalConnectionString(version, authbyte); - return doBadPreambleHandling(msg, new WrongVersionException(msg)); - } - if (authMethod == null) { - String msg = getFatalConnectionString(version, authbyte); - return doBadPreambleHandling(msg, new BadAuthException(msg)); - } - if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { - if (allowFallbackToSimpleAuth) { - metrics.authenticationFallback(); - authenticatedWithFallback = true; - } else { - AccessDeniedException ae = new AccessDeniedException("Authentication is required"); - setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); - authFailedCall.sendResponseIfReady(); - throw ae; - } - } - if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { - doRawSaslReply(SaslStatus.SUCCESS, new IntWritable( - SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null); - authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } - if (authMethod != AuthMethod.SIMPLE) { - useSasl = true; - } - - dataLengthBuffer.clear(); - connectionPreambleRead = true; - return count; - } - - private int read4Bytes() throws IOException { - if (this.dataLengthBuffer.remaining() > 0) { - return channelRead(channel, this.dataLengthBuffer); - } else { - return 0; - } - } - - /** - * Read off the wire. If there is not enough data to read, update the connection state with - * what we have and returns. - * @return Returns -1 if failure (and caller will close connection), else zero or more. - * @throws IOException - * @throws InterruptedException - */ - public int readAndProcess() throws IOException, InterruptedException { - // Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it - // does, read in the rest of the connection preamble, the version and the auth method. - // Else it will be length of the data to read (or -1 if a ping). We catch the integer - // length into the 4-byte this.dataLengthBuffer. - int count = read4Bytes(); - if (count < 0 || dataLengthBuffer.remaining() > 0) { - return count; - } - - // If we have not read the connection setup preamble, look to see if that is on the wire. - if (!connectionPreambleRead) { - count = readPreamble(); - if (!connectionPreambleRead) { - return count; - } - - count = read4Bytes(); - if (count < 0 || dataLengthBuffer.remaining() > 0) { - return count; - } - } - - // We have read a length and we have read the preamble. It is either the connection header - // or it is a request. - if (data == null) { - dataLengthBuffer.flip(); - int dataLength = dataLengthBuffer.getInt(); - if (dataLength == RpcClient.PING_CALL_ID) { - if (!useWrap) { //covers the !useSasl too - dataLengthBuffer.clear(); - return 0; //ping message - } - } - if (dataLength < 0) { // A data length of zero is legal. - throw new DoNotRetryIOException("Unexpected data length " - + dataLength + "!! from " + getHostAddress()); - } - - if (dataLength > maxRequestSize) { - String msg = "RPC data length of " + dataLength + " received from " - + getHostAddress() + " is greater than max allowed " - + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE - + "\" on server to override this limit (not recommended)"; - LOG.warn(msg); - - if (connectionHeaderRead && connectionPreambleRead) { - incRpcCount(); - // Construct InputStream for the non-blocking SocketChannel - // We need the InputStream because we want to read only the request header - // instead of the whole rpc. - ByteBuffer buf = ByteBuffer.allocate(1); - InputStream is = new InputStream() { - @Override - public int read() throws IOException { - channelRead(channel, buf); - buf.flip(); - int x = buf.get(); - buf.flip(); - return x; - } - }; - CodedInputStream cis = CodedInputStream.newInstance(is); - int headerSize = cis.readRawVarint32(); - Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, cis, headerSize); - RequestHeader header = (RequestHeader) builder.build(); - - // Notify the client about the offending request - SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, - null, null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0, - reservoir, cellBlockBuilder, null, responder); - metrics.exception(REQUEST_TOO_BIG_EXCEPTION); - // Make sure the client recognizes the underlying exception - // Otherwise, throw a DoNotRetryIOException. - if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), - RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { - setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg); - } else { - setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg); - } - // We are going to close the connection, make sure we process the response - // before that. In rare case when this fails, we still close the connection. - responseWriteLock.lock(); - responder.processResponse(reqTooBig); - responseWriteLock.unlock(); - } - // Close the connection - return -1; - } - - // Initialize this.data with a ByteBuff. - // This call will allocate a ByteBuff to read request into and assign to this.data - // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and - // assign to this.callCleanup - initByteBuffToReadInto(dataLength); - - // Increment the rpc count. This counter will be decreased when we write - // the response. If we want the connection to be detected as idle properly, we - // need to keep the inc / dec correct. - incRpcCount(); - } - - count = channelDataRead(channel, data); - - if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 - process(); - } - - return count; - } - - // It creates the ByteBuff and CallCleanup and assign to Connection instance. - private void initByteBuffToReadInto(int length) { - // We create random on heap buffers are read into those when - // 1. ByteBufferPool is not there. - // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is - // waste then. Also if all the reqs are of this size, we will be creating larger sized - // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like - // RegionOpen. - // 3. If it is an initial handshake signal or initial connection request. Any way then - // condition 2 itself will match - // 4. When SASL use is ON. - if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl - || length < minSizeForReservoirUse) { - this.data = new SingleByteBuff(ByteBuffer.allocate(length)); - } else { - Pair pair = RpcServer.allocateByteBuffToReadInto(reservoir, - minSizeForReservoirUse, length); - this.data = pair.getFirst(); - this.callCleanup = pair.getSecond(); - } - } - - protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { - int count = buf.read(channel); - if (count > 0) { - metrics.receivedBytes(count); - } - return count; - } - - /** - * Process the data buffer and clean the connection state for the next call. - */ - private void process() throws IOException, InterruptedException { - data.rewind(); - try { - if (skipInitialSaslHandshake) { - skipInitialSaslHandshake = false; - return; - } - - if (useSasl) { - saslReadAndProcess(data); - } else { - processOneRpc(data); - } - - } finally { - dataLengthBuffer.clear(); // Clean for the next call - data = null; // For the GC - this.callCleanup = null; - } - } - - private int doBadPreambleHandling(final String msg) throws IOException { - return doBadPreambleHandling(msg, new FatalConnectionException(msg)); - } - - private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { - LOG.warn(msg); - SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1, - null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder); - setupResponse(null, fakeCall, e, msg); - responder.doRespond(fakeCall); - // Returning -1 closes out the connection. - return -1; - } - - @Override - public synchronized void close() { - disposeSasl(); - data = null; - callCleanup = null; - if (!channel.isOpen()) - return; - try {socket.shutdownOutput();} catch(Exception ignored) { - if (LOG.isTraceEnabled()) { - LOG.trace("Ignored exception", ignored); - } - } - if (channel.isOpen()) { - try {channel.close();} catch(Exception ignored) {} - } - try { - socket.close(); - } catch(Exception ignored) { - if (LOG.isTraceEnabled()) { - LOG.trace("Ignored exception", ignored); - } - } - } - - @Override - public boolean isConnectionOpen() { - return channel.isOpen(); - } - - @Override - public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md, - RequestHeader header, Message param, CellScanner cellScanner, - RpcServer.Connection connection, long size, TraceInfo tinfo, - final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { - return new SimpleServerCall(id, service, md, header, param, cellScanner, connection, size, - tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder, - reqCleanup, responder); - } - } - - /** * Constructs a server listening on the named port and address. * @param server hosting instance of {@link Server}. We will do authentications if an @@ -1065,7 +392,7 @@ public class SimpleRpcServer extends RpcServer { this.port = listener.getAddress().getPort(); // Create the responder here - responder = new Responder(); + responder = new SimpleRpcServerResponder(this); connectionManager = new ConnectionManager(); initReconfigurable(conf); @@ -1076,11 +403,11 @@ public class SimpleRpcServer extends RpcServer { * Subclasses of HBaseServer can override this to provide their own * Connection implementations. */ - protected Connection getConnection(SocketChannel channel, long time) { - return new Connection(channel, time); + protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { + return new SimpleServerRpcConnection(this, channel, time); } - protected void closeConnection(Connection connection) { + protected void closeConnection(SimpleServerRpcConnection connection) { connectionManager.close(connection); } @@ -1228,7 +555,7 @@ public class SimpleRpcServer extends RpcServer { private class ConnectionManager { final private AtomicInteger count = new AtomicInteger(); - final private Set connections; + final private Set connections; final private Timer idleScanTimer; final private int idleScanThreshold; @@ -1250,11 +577,11 @@ public class SimpleRpcServer extends RpcServer { // create a set with concurrency -and- a thread-safe iterator, add 2 // for listener and idle closer threads this.connections = Collections.newSetFromMap( - new ConcurrentHashMap( + new ConcurrentHashMap( maxConnectionQueueSize, 0.75f, readThreads+2)); } - private boolean add(Connection connection) { + private boolean add(SimpleServerRpcConnection connection) { boolean added = connections.add(connection); if (added) { count.getAndIncrement(); @@ -1262,7 +589,7 @@ public class SimpleRpcServer extends RpcServer { return added; } - private boolean remove(Connection connection) { + private boolean remove(SimpleServerRpcConnection connection) { boolean removed = connections.remove(connection); if (removed) { count.getAndDecrement(); @@ -1274,12 +601,12 @@ public class SimpleRpcServer extends RpcServer { return count.get(); } - Connection[] toArray() { - return connections.toArray(new Connection[0]); + SimpleServerRpcConnection[] toArray() { + return connections.toArray(new SimpleServerRpcConnection[0]); } - Connection register(SocketChannel channel) { - Connection connection = getConnection(channel, System.currentTimeMillis()); + SimpleServerRpcConnection register(SocketChannel channel) { + SimpleServerRpcConnection connection = getConnection(channel, System.currentTimeMillis()); add(connection); if (LOG.isDebugEnabled()) { LOG.debug("Server connection from " + connection + @@ -1291,7 +618,7 @@ public class SimpleRpcServer extends RpcServer { return connection; } - boolean close(Connection connection) { + boolean close(SimpleServerRpcConnection connection) { boolean exists = remove(connection); if (exists) { if (LOG.isDebugEnabled()) { @@ -1314,7 +641,7 @@ public class SimpleRpcServer extends RpcServer { // during the iteration, but that's ok because they won't // be idle yet anyway and will be caught on next scan int closed = 0; - for (Connection connection : connections) { + for (SimpleServerRpcConnection connection : connections) { // stop if connections dropped below threshold unless scanning all if (!scanAll && size() < idleScanThreshold) { break; @@ -1332,7 +659,7 @@ public class SimpleRpcServer extends RpcServer { void closeAll() { // use a copy of the connections to be absolutely sure the concurrent // iterator doesn't miss a connection - for (Connection connection : toArray()) { + for (SimpleServerRpcConnection connection : toArray()) { close(connection); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java new file mode 100644 index 00000000000..5f072a9f7c0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.StringUtils; + +/** + * Sends responses of RPC back to clients. + */ +@InterfaceAudience.Private +class SimpleRpcServerResponder extends Thread { + /** */ + private final SimpleRpcServer simpleRpcServer; + private final Selector writeSelector; + private final Set writingCons = + Collections.newSetFromMap(new ConcurrentHashMap()); + + SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException { + this.simpleRpcServer = simpleRpcServer; + this.setName("RpcServer.responder"); + this.setDaemon(true); + this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); + writeSelector = Selector.open(); // create a selector + } + + @Override + public void run() { + SimpleRpcServer.LOG.debug(getName() + ": starting"); + try { + doRunLoop(); + } finally { + SimpleRpcServer.LOG.info(getName() + ": stopping"); + try { + writeSelector.close(); + } catch (IOException ioe) { + SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe); + } + } + } + + /** + * Take the list of the connections that want to write, and register them in the selector. + */ + private void registerWrites() { + Iterator it = writingCons.iterator(); + while (it.hasNext()) { + SimpleServerRpcConnection c = it.next(); + it.remove(); + SelectionKey sk = c.channel.keyFor(writeSelector); + try { + if (sk == null) { + try { + c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); + } catch (ClosedChannelException e) { + // ignore: the client went away. + if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); + } + } else { + sk.interestOps(SelectionKey.OP_WRITE); + } + } catch (CancelledKeyException e) { + // ignore: the client went away. + if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); + } + } + } + + /** + * Add a connection to the list that want to write, + */ + public void registerForWrite(SimpleServerRpcConnection c) { + if (writingCons.add(c)) { + writeSelector.wakeup(); + } + } + + private void doRunLoop() { + long lastPurgeTime = 0; // last check for old calls. + while (this.simpleRpcServer.running) { + try { + registerWrites(); + int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout); + if (keyCt == 0) { + continue; + } + + Set keys = writeSelector.selectedKeys(); + Iterator iter = keys.iterator(); + while (iter.hasNext()) { + SelectionKey key = iter.next(); + iter.remove(); + try { + if (key.isValid() && key.isWritable()) { + doAsyncWrite(key); + } + } catch (IOException e) { + SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e); + } + } + + lastPurgeTime = purge(lastPurgeTime); + + } catch (OutOfMemoryError e) { + if (this.simpleRpcServer.errorHandler != null) { + if (this.simpleRpcServer.errorHandler.checkOOME(e)) { + SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError"); + return; + } + } else { + // + // we can run out of memory if we have too many threads + // log the event and sleep for a minute and give + // some thread(s) a chance to finish + // + SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e); + try { + Thread.sleep(60000); + } catch (InterruptedException ex) { + SimpleRpcServer.LOG.debug("Interrupted while sleeping"); + return; + } + } + } catch (Exception e) { + SimpleRpcServer.LOG + .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e); + } + } + SimpleRpcServer.LOG.info(getName() + ": stopped"); + } + + /** + * If there were some calls that have not been sent out for a long time, we close the connection. + * @return the time of the purge. + */ + private long purge(long lastPurgeTime) { + long now = System.currentTimeMillis(); + if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) { + return lastPurgeTime; + } + + ArrayList conWithOldCalls = new ArrayList<>(); + // get the list of channels from list of keys. + synchronized (writeSelector.keys()) { + for (SelectionKey key : writeSelector.keys()) { + SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); + if (connection == null) { + throw new IllegalStateException("Coding error: SelectionKey key without attachment."); + } + SimpleServerCall call = connection.responseQueue.peekFirst(); + if (call != null && now > call.lastSentTime + this.simpleRpcServer.purgeTimeout) { + conWithOldCalls.add(call.getConnection()); + } + } + } + + // Seems safer to close the connection outside of the synchronized loop... + for (SimpleServerRpcConnection connection : conWithOldCalls) { + this.simpleRpcServer.closeConnection(connection); + } + + return now; + } + + private void doAsyncWrite(SelectionKey key) throws IOException { + SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); + if (connection == null) { + throw new IOException("doAsyncWrite: no connection"); + } + if (key.channel() != connection.channel) { + throw new IOException("doAsyncWrite: bad channel"); + } + + if (processAllResponses(connection)) { + try { + // We wrote everything, so we don't need to be told when the socket is ready for + // write anymore. + key.interestOps(0); + } catch (CancelledKeyException e) { + /* + * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so + * not sure if this will ever fire. This warning could be removed. + */ + SimpleRpcServer.LOG.warn("Exception while changing ops : " + e); + } + } + } + + /** + * Process the response for this call. You need to have the lock on + * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock} + * @param call the call + * @return true if we proceed the call fully, false otherwise. + * @throws IOException + */ + boolean processResponse(final SimpleServerCall call) throws IOException { + boolean error = true; + try { + // Send as much data as we can in the non-blocking fashion + long numBytes = + this.simpleRpcServer.channelWrite(call.getConnection().channel, call.response); + if (numBytes < 0) { + throw new HBaseIOException( + "Error writing on the socket " + "for the call:" + call.toShortString()); + } + error = false; + } finally { + if (error) { + SimpleRpcServer.LOG.debug(getName() + call.toShortString() + ": output error -- closing"); + // We will be closing this connection itself. Mark this call as done so that all the + // buffer(s) it got from pool can get released + call.done(); + this.simpleRpcServer.closeConnection(call.getConnection()); + } + } + + if (!call.response.hasRemaining()) { + call.done(); + return true; + } else { + return false; // Socket can't take more, we will have to come back. + } + } + + /** + * Process all the responses for this connection + * @return true if all the calls were processed or that someone else is doing it. false if there * + * is still some work to do. In this case, we expect the caller to delay us. + * @throws IOException + */ + private boolean processAllResponses(final SimpleServerRpcConnection connection) + throws IOException { + // We want only one writer on the channel for a connection at a time. + connection.responseWriteLock.lock(); + try { + for (int i = 0; i < 20; i++) { + // protection if some handlers manage to need all the responder + SimpleServerCall call = connection.responseQueue.pollFirst(); + if (call == null) { + return true; + } + if (!processResponse(call)) { + connection.responseQueue.addFirst(call); + return false; + } + } + } finally { + connection.responseWriteLock.unlock(); + } + + return connection.responseQueue.isEmpty(); + } + + // + // Enqueue a response from the application. + // + void doRespond(SimpleServerCall call) throws IOException { + boolean added = false; + + // If there is already a write in progress, we don't wait. This allows to free the handlers + // immediately for other tasks. + if (call.getConnection().responseQueue.isEmpty() && + call.getConnection().responseWriteLock.tryLock()) { + try { + if (call.getConnection().responseQueue.isEmpty()) { + // If we're alone, we can try to do a direct call to the socket. It's + // an optimisation to save on context switches and data transfer between cores.. + if (processResponse(call)) { + return; // we're done. + } + // Too big to fit, putting ahead. + call.getConnection().responseQueue.addFirst(call); + added = true; // We will register to the selector later, outside of the lock. + } + } finally { + call.getConnection().responseWriteLock.unlock(); + } + } + + if (!added) { + call.getConnection().responseQueue.addLast(call); + } + call.responder.registerForWrite(call.getConnection()); + + // set the serve time when the response has to be sent later + call.lastSentTime = System.currentTimeMillis(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java index b82d34891bb..af575ea42da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java @@ -24,8 +24,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; -import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection; -import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Responder; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; @@ -37,19 +35,19 @@ import org.apache.htrace.TraceInfo; * result. */ @InterfaceAudience.Private -class SimpleServerCall extends ServerCall { +class SimpleServerCall extends ServerCall { long lastSentTime; - final Responder responder; + final SimpleRpcServerResponder responder; @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", justification = "Can't figure why this complaint is happening... see below") SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md, - RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection, - long size, TraceInfo tinfo, final InetAddress remoteAddress, long receiveTime, int timeout, - ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, - Responder responder) { + RequestHeader header, Message param, CellScanner cellScanner, + SimpleServerRpcConnection connection, long size, TraceInfo tinfo, + final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, + CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) { super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress, receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); this.responder = responder; @@ -73,7 +71,7 @@ class SimpleServerCall extends ServerCall { this.responder.doRespond(this); } - Connection getConnection() { - return (Connection) this.connection; + SimpleServerRpcConnection getConnection() { + return (SimpleServerRpcConnection) this.connection; } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java new file mode 100644 index 00000000000..50a1a6be5dc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.VersionInfoUtil; +import org.apache.hadoop.hbase.exceptions.RequestTooBigException; +import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.SaslStatus; +import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.IntWritable; +import org.apache.htrace.TraceInfo; + +/** Reads calls from a connection and queues them for handling. */ +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", + justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") +@InterfaceAudience.Private +class SimpleServerRpcConnection extends ServerRpcConnection { + + final SocketChannel channel; + private ByteBuff data; + private ByteBuffer dataLengthBuffer; + protected final ConcurrentLinkedDeque responseQueue = + new ConcurrentLinkedDeque<>(); + final Lock responseWriteLock = new ReentrantLock(); + private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs + private long lastContact; + private final Socket socket; + private final SimpleRpcServerResponder responder; + + public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel, + long lastContact) { + super(rpcServer); + this.channel = channel; + this.lastContact = lastContact; + this.data = null; + this.dataLengthBuffer = ByteBuffer.allocate(4); + this.socket = channel.socket(); + this.addr = socket.getInetAddress(); + if (addr == null) { + this.hostAddress = "*Unknown*"; + } else { + this.hostAddress = addr.getHostAddress(); + } + this.remotePort = socket.getPort(); + if (rpcServer.socketSendBufferSize != 0) { + try { + socket.setSendBufferSize(rpcServer.socketSendBufferSize); + } catch (IOException e) { + SimpleRpcServer.LOG.warn( + "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize); + } + } + this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, + null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null, + rpcServer.responder); + this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID, + null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0, + rpcServer.reservoir, rpcServer.cellBlockBuilder, null, rpcServer.responder); + this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, + null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir, + rpcServer.cellBlockBuilder, null, rpcServer.responder); + this.responder = rpcServer.responder; + } + + public void setLastContact(long lastContact) { + this.lastContact = lastContact; + } + + public long getLastContact() { + return lastContact; + } + + /* Return true if the connection has no outstanding rpc */ + boolean isIdle() { + return rpcCount.sum() == 0; + } + + /* Decrement the outstanding RPC count */ + protected void decRpcCount() { + rpcCount.decrement(); + } + + /* Increment the outstanding RPC count */ + protected void incRpcCount() { + rpcCount.increment(); + } + + private int readPreamble() throws IOException { + int count; + // Check for 'HBas' magic. + this.dataLengthBuffer.flip(); + if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) { + return doBadPreambleHandling( + "Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" + + Bytes.toStringBinary(dataLengthBuffer.array()) + " from " + toString()); + } + // Now read the next two bytes, the version and the auth to use. + ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2); + count = this.rpcServer.channelRead(channel, versionAndAuthBytes); + if (count < 0 || versionAndAuthBytes.remaining() > 0) { + return count; + } + int version = versionAndAuthBytes.get(0); + byte authbyte = versionAndAuthBytes.get(1); + this.authMethod = AuthMethod.valueOf(authbyte); + if (version != SimpleRpcServer.CURRENT_VERSION) { + String msg = getFatalConnectionString(version, authbyte); + return doBadPreambleHandling(msg, new WrongVersionException(msg)); + } + if (authMethod == null) { + String msg = getFatalConnectionString(version, authbyte); + return doBadPreambleHandling(msg, new BadAuthException(msg)); + } + if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { + if (this.rpcServer.allowFallbackToSimpleAuth) { + this.rpcServer.metrics.authenticationFallback(); + authenticatedWithFallback = true; + } else { + AccessDeniedException ae = new AccessDeniedException("Authentication is required"); + this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); + authFailedCall.sendResponseIfReady(); + throw ae; + } + } + if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, + null); + authMethod = AuthMethod.SIMPLE; + // client has already sent the initial Sasl message and we + // should ignore it. Both client and server should fall back + // to simple auth from now on. + skipInitialSaslHandshake = true; + } + if (authMethod != AuthMethod.SIMPLE) { + useSasl = true; + } + + dataLengthBuffer.clear(); + connectionPreambleRead = true; + return count; + } + + private int read4Bytes() throws IOException { + if (this.dataLengthBuffer.remaining() > 0) { + return this.rpcServer.channelRead(channel, this.dataLengthBuffer); + } else { + return 0; + } + } + + /** + * Read off the wire. If there is not enough data to read, update the connection state with what + * we have and returns. + * @return Returns -1 if failure (and caller will close connection), else zero or more. + * @throws IOException + * @throws InterruptedException + */ + public int readAndProcess() throws IOException, InterruptedException { + // Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it + // does, read in the rest of the connection preamble, the version and the auth method. + // Else it will be length of the data to read (or -1 if a ping). We catch the integer + // length into the 4-byte this.dataLengthBuffer. + int count = read4Bytes(); + if (count < 0 || dataLengthBuffer.remaining() > 0) { + return count; + } + + // If we have not read the connection setup preamble, look to see if that is on the wire. + if (!connectionPreambleRead) { + count = readPreamble(); + if (!connectionPreambleRead) { + return count; + } + + count = read4Bytes(); + if (count < 0 || dataLengthBuffer.remaining() > 0) { + return count; + } + } + + // We have read a length and we have read the preamble. It is either the connection header + // or it is a request. + if (data == null) { + dataLengthBuffer.flip(); + int dataLength = dataLengthBuffer.getInt(); + if (dataLength == RpcClient.PING_CALL_ID) { + if (!useWrap) { // covers the !useSasl too + dataLengthBuffer.clear(); + return 0; // ping message + } + } + if (dataLength < 0) { // A data length of zero is legal. + throw new DoNotRetryIOException( + "Unexpected data length " + dataLength + "!! from " + getHostAddress()); + } + + if (dataLength > this.rpcServer.maxRequestSize) { + String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() + + " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" + + SimpleRpcServer.MAX_REQUEST_SIZE + + "\" on server to override this limit (not recommended)"; + SimpleRpcServer.LOG.warn(msg); + + if (connectionHeaderRead && connectionPreambleRead) { + incRpcCount(); + // Construct InputStream for the non-blocking SocketChannel + // We need the InputStream because we want to read only the request header + // instead of the whole rpc. + ByteBuffer buf = ByteBuffer.allocate(1); + InputStream is = new InputStream() { + @Override + public int read() throws IOException { + SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf); + buf.flip(); + int x = buf.get(); + buf.flip(); + return x; + } + }; + CodedInputStream cis = CodedInputStream.newInstance(is); + int headerSize = cis.readRawVarint32(); + Message.Builder builder = RequestHeader.newBuilder(); + ProtobufUtil.mergeFrom(builder, cis, headerSize); + RequestHeader header = (RequestHeader) builder.build(); + + // Notify the client about the offending request + SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null, + null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0, + this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder); + this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); + // Make sure the client recognizes the underlying exception + // Otherwise, throw a DoNotRetryIOException. + if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), + RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { + this.rpcServer.setupResponse(null, reqTooBig, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, + msg); + } else { + this.rpcServer.setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg); + } + // We are going to close the connection, make sure we process the response + // before that. In rare case when this fails, we still close the connection. + responseWriteLock.lock(); + try { + this.responder.processResponse(reqTooBig); + } finally { + responseWriteLock.unlock(); + } + } + // Close the connection + return -1; + } + + // Initialize this.data with a ByteBuff. + // This call will allocate a ByteBuff to read request into and assign to this.data + // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and + // assign to this.callCleanup + initByteBuffToReadInto(dataLength); + + // Increment the rpc count. This counter will be decreased when we write + // the response. If we want the connection to be detected as idle properly, we + // need to keep the inc / dec correct. + incRpcCount(); + } + + count = channelDataRead(channel, data); + + if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 + process(); + } + + return count; + } + + // It creates the ByteBuff and CallCleanup and assign to Connection instance. + private void initByteBuffToReadInto(int length) { + // We create random on heap buffers are read into those when + // 1. ByteBufferPool is not there. + // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is + // waste then. Also if all the reqs are of this size, we will be creating larger sized + // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like + // RegionOpen. + // 3. If it is an initial handshake signal or initial connection request. Any way then + // condition 2 itself will match + // 4. When SASL use is ON. + if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || + useSasl || length < this.rpcServer.minSizeForReservoirUse) { + this.data = new SingleByteBuff(ByteBuffer.allocate(length)); + } else { + Pair pair = RpcServer.allocateByteBuffToReadInto( + this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length); + this.data = pair.getFirst(); + this.callCleanup = pair.getSecond(); + } + } + + protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { + int count = buf.read(channel); + if (count > 0) { + this.rpcServer.metrics.receivedBytes(count); + } + return count; + } + + /** + * Process the data buffer and clean the connection state for the next call. + */ + private void process() throws IOException, InterruptedException { + data.rewind(); + try { + if (skipInitialSaslHandshake) { + skipInitialSaslHandshake = false; + return; + } + + if (useSasl) { + saslReadAndProcess(data); + } else { + processOneRpc(data); + } + + } finally { + dataLengthBuffer.clear(); // Clean for the next call + data = null; // For the GC + this.callCleanup = null; + } + } + + private int doBadPreambleHandling(final String msg) throws IOException { + return doBadPreambleHandling(msg, new FatalConnectionException(msg)); + } + + private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { + SimpleRpcServer.LOG.warn(msg); + SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1, + null, null, System.currentTimeMillis(), 0, this.rpcServer.reservoir, + this.rpcServer.cellBlockBuilder, null, responder); + this.rpcServer.setupResponse(null, fakeCall, e, msg); + this.responder.doRespond(fakeCall); + // Returning -1 closes out the connection. + return -1; + } + + @Override + public synchronized void close() { + disposeSasl(); + data = null; + callCleanup = null; + if (!channel.isOpen()) return; + try { + socket.shutdownOutput(); + } catch (Exception ignored) { + if (SimpleRpcServer.LOG.isTraceEnabled()) { + SimpleRpcServer.LOG.trace("Ignored exception", ignored); + } + } + if (channel.isOpen()) { + try { + channel.close(); + } catch (Exception ignored) { + } + } + try { + socket.close(); + } catch (Exception ignored) { + if (SimpleRpcServer.LOG.isTraceEnabled()) { + SimpleRpcServer.LOG.trace("Ignored exception", ignored); + } + } + } + + @Override + public boolean isConnectionOpen() { + return channel.isOpen(); + } + + @Override + public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md, + RequestHeader header, Message param, CellScanner cellScanner, long size, TraceInfo tinfo, + InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { + return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, tinfo, + remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, + this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java index cfb0e02ccdc..eb325adc65a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java @@ -23,6 +23,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.Locale; import java.util.Map; +import java.util.function.Consumer; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -34,14 +35,13 @@ import javax.security.sasl.RealmCallback; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.TokenIdentifier; /** * A utility class for dealing with SASL on RPC server @@ -79,13 +79,12 @@ public class HBaseSaslRpcServer { /** CallbackHandler for SASL DIGEST-MD5 mechanism */ public static class SaslDigestCallbackHandler implements CallbackHandler { private SecretManager secretManager; - private RpcServer.Connection connection; + private Consumer attemptingUserConsumer; - public SaslDigestCallbackHandler( - SecretManager secretManager, - RpcServer.Connection connection) { + public SaslDigestCallbackHandler(SecretManager secretManager, + Consumer attemptingUserConsumer) { this.secretManager = secretManager; - this.connection = connection; + this.attemptingUserConsumer = attemptingUserConsumer; } private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken { @@ -116,12 +115,11 @@ public class HBaseSaslRpcServer { if (pc != null) { TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager); char[] password = getPassword(tokenIdentifier); - UserGroupInformation user = null; - user = tokenIdentifier.getUser(); // may throw exception - connection.attemptingUser = user; + UserGroupInformation user = tokenIdentifier.getUser(); // may throw exception + attemptingUserConsumer.accept(user); if (LOG.isTraceEnabled()) { - LOG.trace("SASL server DIGEST-MD5 callback: setting password " - + "for client: " + tokenIdentifier.getUser()); + LOG.trace("SASL server DIGEST-MD5 callback: setting password " + "for client: " + + tokenIdentifier.getUser()); } pc.setPassword(password); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 581e50e21ac..2bd750eb4e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -315,9 +315,10 @@ public abstract class AbstractTestIPC { new InetSocketAddress("localhost", 0), conf, scheduler); } - class FailingConnection extends Connection { - public FailingConnection(SocketChannel channel, long lastContact) { - super(channel, lastContact); + class FailingConnection extends SimpleServerRpcConnection { + public FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel, + long lastContact) { + super(rpcServer, channel, lastContact); } @Override @@ -329,8 +330,8 @@ public abstract class AbstractTestIPC { } @Override - protected Connection getConnection(SocketChannel channel, long time) { - return new FailingConnection(channel, time); + protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { + return new FailingConnection(this, channel, time); } }