HBASE-18012 Move RpcServer.Connection to a separated file

This commit is contained in:
zhangduo 2017-05-10 11:05:38 +08:00
parent 5cdaca5c00
commit 341223d86c
14 changed files with 1869 additions and 1737 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.nio; package org.apache.hadoop.hbase.nio;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
@ -27,8 +29,6 @@ import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.hbase.util.UnsafeAccess; import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker; import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
import com.google.common.annotations.VisibleForTesting;
import sun.nio.ch.DirectBuffer; import sun.nio.ch.DirectBuffer;
/** /**

View File

@ -75,8 +75,8 @@ public class CallRunner {
* @deprecated As of release 2.0, this will be removed in HBase 3.0 * @deprecated As of release 2.0, this will be removed in HBase 3.0
*/ */
@Deprecated @Deprecated
public ServerCall getCall() { public ServerCall<?> getCall() {
return (ServerCall) call; return (ServerCall<?>) call;
} }
public void setStatus(MonitoredRPCHandler status) { public void setStatus(MonitoredRPCHandler status) {

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
@ -46,10 +45,7 @@ import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -57,31 +53,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.htrace.TraceInfo;
/** /**
* An RPC server with Netty4 implementation. * An RPC server with Netty4 implementation.
*
*/ */
@InterfaceAudience.Private
public class NettyRpcServer extends RpcServer { public class NettyRpcServer extends RpcServer {
public static final Log LOG = LogFactory.getLog(NettyRpcServer.class); public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
@ -187,166 +173,6 @@ public class NettyRpcServer extends RpcServer {
return ((InetSocketAddress) serverChannel.localAddress()); return ((InetSocketAddress) serverChannel.localAddress());
} }
public class NettyConnection extends RpcServer.Connection {
protected Channel channel;
NettyConnection(Channel channel) {
super();
this.channel = channel;
InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
this.addr = inetSocketAddress.getAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
}
this.remotePort = inetSocketAddress.getPort();
this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
this.setConnectionHeaderResponseCall =
new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this,
0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
this.authFailedCall =
new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0,
null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
}
void readPreamble(ByteBuf buffer) throws IOException {
byte[] rpcHead =
{ buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
doBadPreambleHandling("Expected HEADER="
+ Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER="
+ Bytes.toStringBinary(rpcHead) + " from " + toString());
return;
}
// Now read the next two bytes, the version and the auth to use.
int version = buffer.readByte();
byte authbyte = buffer.readByte();
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != CURRENT_VERSION) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new WrongVersionException(msg));
return;
}
if (authMethod == null) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new BadAuthException(msg));
return;
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
if (allowFallbackToSimpleAuth) {
metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
AccessDeniedException ae = new AccessDeniedException(
"Authentication is required");
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
((NettyServerCall) authFailedCall)
.sendResponseIfReady(ChannelFutureListener.CLOSE);
return;
}
}
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
connectionPreambleRead = true;
}
private void doBadPreambleHandling(final String msg) throws IOException {
doBadPreambleHandling(msg, new FatalConnectionException(msg));
}
private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1,
null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
setupResponse(null, fakeCall, e, msg);
// closes out the connection.
fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
}
void process(final ByteBuf buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
this.callCleanup = new RpcServer.CallCleanup() {
@Override
public void run() {
buf.release();
}
};
process(new SingleByteBuff(buf.nioBuffer()));
} else {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data, 0, data.length);
ByteBuffer connectionHeader = ByteBuffer.wrap(data);
buf.release();
process(connectionHeader);
}
}
void process(ByteBuffer buf) throws IOException, InterruptedException {
process(new SingleByteBuff(buf));
}
void process(ByteBuff buf) throws IOException, InterruptedException {
try {
if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false;
if (callCleanup != null) {
callCleanup.run();
}
return;
}
if (useSasl) {
saslReadAndProcess(buf);
} else {
processOneRpc(buf);
}
} catch (Exception e) {
if (callCleanup != null) {
callCleanup.run();
}
throw e;
} finally {
this.callCleanup = null;
}
}
@Override
public synchronized void close() {
disposeSasl();
channel.close();
callCleanup = null;
}
@Override
public boolean isConnectionOpen() {
return channel.isOpen();
}
@Override
public ServerCall createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param,
CellScanner cellScanner, RpcServer.Connection connection, long size,
TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size,
tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
reqCleanup);
}
}
private class Initializer extends ChannelInitializer<SocketChannel> { private class Initializer extends ChannelInitializer<SocketChannel> {
final int maxRequestSize; final int maxRequestSize;
@ -368,7 +194,7 @@ public class NettyRpcServer extends RpcServer {
} }
private class ConnectionHeaderHandler extends ByteToMessageDecoder { private class ConnectionHeaderHandler extends ByteToMessageDecoder {
private NettyConnection connection; private NettyServerRpcConnection connection;
ConnectionHeaderHandler() { ConnectionHeaderHandler() {
} }
@ -379,7 +205,7 @@ public class NettyRpcServer extends RpcServer {
if (byteBuf.readableBytes() < 6) { if (byteBuf.readableBytes() < 6) {
return; return;
} }
connection = new NettyConnection(ctx.channel()); connection = new NettyServerRpcConnection(NettyRpcServer.this, ctx.channel());
connection.readPreamble(byteBuf); connection.readPreamble(byteBuf);
((MessageDecoder) ctx.pipeline().get("decoder")) ((MessageDecoder) ctx.pipeline().get("decoder"))
.setConnection(connection); .setConnection(connection);
@ -390,9 +216,9 @@ public class NettyRpcServer extends RpcServer {
private class MessageDecoder extends ChannelInboundHandlerAdapter { private class MessageDecoder extends ChannelInboundHandlerAdapter {
private NettyConnection connection; private NettyServerRpcConnection connection;
void setConnection(NettyConnection connection) { void setConnection(NettyServerRpcConnection connection) {
this.connection = connection; this.connection = connection;
} }

View File

@ -25,7 +25,6 @@ import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
@ -38,30 +37,26 @@ import org.apache.htrace.TraceInfo;
* result. * result.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class NettyServerCall extends ServerCall { class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, RpcServer.Connection connection, long size, Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout, TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout,
ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress, super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
} }
NettyConnection getConnection() {
return (NettyConnection) this.connection;
}
/** /**
* If we have a response, and delay is not set, then respond immediately. Otherwise, do not * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
* respond to client. This is called by the RPC code in the context of the Handler thread. * respond to client. This is called by the RPC code in the context of the Handler thread.
*/ */
@Override @Override
public synchronized void sendResponseIfReady() throws IOException { public synchronized void sendResponseIfReady() throws IOException {
getConnection().channel.writeAndFlush(this); connection.channel.writeAndFlush(this);
} }
public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException { public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
getConnection().channel.writeAndFlush(this).addListener(listener); connection.channel.writeAndFlush(this).addListener(listener);
} }
} }

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.htrace.TraceInfo;
/**
* RpcConnection implementation for netty rpc server.
*/
@InterfaceAudience.Private
class NettyServerRpcConnection extends ServerRpcConnection {
final Channel channel;
NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
super(rpcServer);
this.channel = channel;
InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
this.addr = inetSocketAddress.getAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
}
this.remotePort = inetSocketAddress.getPort();
this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
this.setConnectionHeaderResponseCall = new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
this.authFailedCall = new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null,
null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir,
rpcServer.cellBlockBuilder, null);
}
void readPreamble(ByteBuf buffer) throws IOException {
byte[] rpcHead = { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) +
" but received HEADER=" + Bytes.toStringBinary(rpcHead) + " from " + toString());
return;
}
// Now read the next two bytes, the version and the auth to use.
int version = buffer.readByte();
byte authbyte = buffer.readByte();
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != NettyRpcServer.CURRENT_VERSION) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new WrongVersionException(msg));
return;
}
if (authMethod == null) {
String msg = getFatalConnectionString(version, authbyte);
doBadPreambleHandling(msg, new BadAuthException(msg));
return;
}
if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
if (this.rpcServer.allowFallbackToSimpleAuth) {
this.rpcServer.metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
((NettyServerCall) authFailedCall).sendResponseIfReady(ChannelFutureListener.CLOSE);
return;
}
}
if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
connectionPreambleRead = true;
}
private void doBadPreambleHandling(final String msg) throws IOException {
doBadPreambleHandling(msg, new FatalConnectionException(msg));
}
private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
NettyRpcServer.LOG.warn(msg);
NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, null,
null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, null);
this.rpcServer.setupResponse(null, fakeCall, e, msg);
// closes out the connection.
fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
}
void process(final ByteBuf buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
this.callCleanup = new RpcServer.CallCleanup() {
@Override
public void run() {
buf.release();
}
};
process(new SingleByteBuff(buf.nioBuffer()));
} else {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data, 0, data.length);
ByteBuffer connectionHeader = ByteBuffer.wrap(data);
buf.release();
process(connectionHeader);
}
}
void process(ByteBuffer buf) throws IOException, InterruptedException {
process(new SingleByteBuff(buf));
}
void process(ByteBuff buf) throws IOException, InterruptedException {
try {
if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false;
if (callCleanup != null) {
callCleanup.run();
}
return;
}
if (useSasl) {
saslReadAndProcess(buf);
} else {
processOneRpc(buf);
}
} catch (Exception e) {
if (callCleanup != null) {
callCleanup.run();
}
throw e;
} finally {
this.callCleanup = null;
}
}
@Override
public synchronized void close() {
disposeSasl();
channel.close();
callCleanup = null;
}
@Override
public boolean isConnectionOpen() {
return channel.isOpen();
}
@Override
public NettyServerCall createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner,
long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, tinfo,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, reqCleanup);
}
}

View File

@ -20,34 +20,22 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import java.io.ByteArrayInputStream; import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.GatheringByteChannel; import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -59,65 +47,36 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.htrace.TraceInfo;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
/** /**
* An RPC server that hosts protobuf described Services. * An RPC server that hosts protobuf described Services.
* *
@ -262,739 +221,6 @@ public abstract class RpcServer implements RpcServerInterface,
void run(); void run();
} }
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT",
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
public abstract class Connection implements Closeable {
// If initial preamble with version and magic has been read or not.
protected boolean connectionPreambleRead = false;
// If the connection header has been read or not.
protected boolean connectionHeaderRead = false;
protected CallCleanup callCleanup;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
protected String hostAddress;
protected int remotePort;
protected InetAddress addr;
protected ConnectionHeader connectionHeader;
/**
* Codec the client asked use.
*/
protected Codec codec;
/**
* Compression codec the client asked us use.
*/
protected CompressionCodec compressionCodec;
protected BlockingService service;
protected AuthMethod authMethod;
protected boolean saslContextEstablished;
protected boolean skipInitialSaslHandshake;
private ByteBuffer unwrappedData;
// When is this set? FindBugs wants to know! Says NP
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
protected boolean useSasl;
protected SaslServer saslServer;
protected CryptoAES cryptoAES;
protected boolean useWrap = false;
protected boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
protected static final int AUTHORIZATION_FAILED_CALLID = -1;
protected ServerCall authFailedCall;
protected ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
protected static final int SASL_CALLID = -33;
protected ServerCall saslCall;
// Fake 'call' for connection header response
protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
protected ServerCall setConnectionHeaderResponseCall;
// was authentication allowed with a fallback to simple auth
protected boolean authenticatedWithFallback;
protected boolean retryImmediatelySupported = false;
public UserGroupInformation attemptingUser = null; // user name before auth
protected User user = null;
protected UserGroupInformation ugi = null;
public Connection() {
this.callCleanup = null;
}
@Override
public String toString() {
return getHostAddress() + ":" + remotePort;
}
public String getHostAddress() {
return hostAddress;
}
public InetAddress getHostInetAddress() {
return addr;
}
public int getRemotePort() {
return remotePort;
}
public VersionInfo getVersionInfo() {
if (connectionHeader.hasVersionInfo()) {
return connectionHeader.getVersionInfo();
}
return null;
}
protected String getFatalConnectionString(final int version, final byte authByte) {
return "serverVersion=" + CURRENT_VERSION +
", clientVersion=" + version + ", authMethod=" + authByte +
", authSupported=" + (authMethod != null) + " from " + toString();
}
protected UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException {
UserGroupInformation authorizedUgi;
if (authMethod == AuthMethod.DIGEST) {
TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
secretManager);
authorizedUgi = tokenId.getUser();
if (authorizedUgi == null) {
throw new AccessDeniedException(
"Can't retrieve username from tokenIdentifier.");
}
authorizedUgi.addTokenIdentifier(tokenId);
} else {
authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
}
authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
return authorizedUgi;
}
/**
* Set up cell block codecs
* @throws FatalConnectionException
*/
protected void setupCellBlockCodecs(final ConnectionHeader header)
throws FatalConnectionException {
// TODO: Plug in other supported decoders.
if (!header.hasCellBlockCodecClass()) return;
String className = header.getCellBlockCodecClass();
if (className == null || className.length() == 0) return;
try {
this.codec = (Codec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new UnsupportedCellCodecException(className, e);
}
if (!header.hasCellBlockCompressorClass()) return;
className = header.getCellBlockCompressorClass();
try {
this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new UnsupportedCompressionCodecException(className, e);
}
}
/**
* Set up cipher for rpc encryption with Apache Commons Crypto
*
* @throws FatalConnectionException
*/
protected void setupCryptoCipher(final ConnectionHeader header,
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
throws FatalConnectionException {
// If simple auth, return
if (saslServer == null) return;
// check if rpc encryption with Crypto AES
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
.getSaslQop().equalsIgnoreCase(qop);
boolean isCryptoAesEncryption = isEncryption && conf.getBoolean(
"hbase.rpc.crypto.encryption.aes.enabled", false);
if (!isCryptoAesEncryption) return;
if (!header.hasRpcCryptoCipherTransformation()) return;
String transformation = header.getRpcCryptoCipherTransformation();
if (transformation == null || transformation.length() == 0) return;
// Negotiates AES based on complete saslServer.
// The Crypto metadata need to be encrypted and send to client.
Properties properties = new Properties();
// the property for SecureRandomFactory
properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
"org.apache.commons.crypto.random.JavaCryptoRandom"));
// the property for cipher class
properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
"org.apache.commons.crypto.cipher.JceCipher"));
int cipherKeyBits = conf.getInt(
"hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
// generate key and iv
if (cipherKeyBits % 8 != 0) {
throw new IllegalArgumentException("The AES cipher key size in bits" +
" should be a multiple of byte");
}
int len = cipherKeyBits / 8;
byte[] inKey = new byte[len];
byte[] outKey = new byte[len];
byte[] inIv = new byte[len];
byte[] outIv = new byte[len];
try {
// generate the cipher meta data with SecureRandom
CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
secureRandom.nextBytes(inKey);
secureRandom.nextBytes(outKey);
secureRandom.nextBytes(inIv);
secureRandom.nextBytes(outIv);
// create CryptoAES for server
cryptoAES = new CryptoAES(transformation, properties,
inKey, outKey, inIv, outIv);
// create SaslCipherMeta and send to client,
// for client, the [inKey, outKey], [inIv, outIv] should be reversed
RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
ccmBuilder.setTransformation(transformation);
ccmBuilder.setInIv(getByteString(outIv));
ccmBuilder.setInKey(getByteString(outKey));
ccmBuilder.setOutIv(getByteString(inIv));
ccmBuilder.setOutKey(getByteString(inKey));
chrBuilder.setCryptoCipherMeta(ccmBuilder);
useCryptoAesWrap = true;
} catch (GeneralSecurityException | IOException ex) {
throw new UnsupportedCryptoException(ex.getMessage(), ex);
}
}
private ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
}
protected UserGroupInformation createUser(ConnectionHeader head) {
UserGroupInformation ugi = null;
if (!head.hasUserInfo()) {
return null;
}
UserInformation userInfoProto = head.getUserInfo();
String effectiveUser = null;
if (userInfoProto.hasEffectiveUser()) {
effectiveUser = userInfoProto.getEffectiveUser();
}
String realUser = null;
if (userInfoProto.hasRealUser()) {
realUser = userInfoProto.getRealUser();
}
if (effectiveUser != null) {
if (realUser != null) {
UserGroupInformation realUserUgi =
UserGroupInformation.createRemoteUser(realUser);
ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
} else {
ugi = UserGroupInformation.createRemoteUser(effectiveUser);
}
}
return ugi;
}
protected void disposeSasl() {
if (saslServer != null) {
try {
saslServer.dispose();
saslServer = null;
} catch (SaslException ignored) {
// Ignored. This is being disposed of anyway.
}
}
}
/**
* No protobuf encoding of raw sasl messages
*/
protected void doRawSaslReply(SaslStatus status, Writable rv,
String errorClass, String error) throws IOException {
ByteBufferOutputStream saslResponse = null;
DataOutputStream out = null;
try {
// In my testing, have noticed that sasl messages are usually
// in the ballpark of 100-200. That's why the initial capacity is 256.
saslResponse = new ByteBufferOutputStream(256);
out = new DataOutputStream(saslResponse);
out.writeInt(status.state); // write status
if (status == SaslStatus.SUCCESS) {
rv.write(out);
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
saslCall.sendResponseIfReady();
} finally {
if (saslResponse != null) {
saslResponse.close();
}
if (out != null) {
out.close();
}
}
}
public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
InterruptedException {
if (saslContextEstablished) {
if (LOG.isTraceEnabled())
LOG.trace("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.unwrap()");
if (!useWrap) {
processOneRpc(saslToken);
} else {
byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
byte [] plaintextData;
if (useCryptoAesWrap) {
// unwrap with CryptoAES
plaintextData = cryptoAES.unwrap(b, 0, b.length);
} else {
plaintextData = saslServer.unwrap(b, 0, b.length);
}
processUnwrappedData(plaintextData);
}
} else {
byte[] replyToken;
try {
if (saslServer == null) {
switch (authMethod) {
case DIGEST:
if (secretManager == null) {
throw new AccessDeniedException(
"Server is not configured to do DIGEST authentication.");
}
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
.getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
secretManager, this));
break;
default:
UserGroupInformation current = UserGroupInformation.getCurrentUser();
String fullName = current.getUserName();
if (LOG.isDebugEnabled()) {
LOG.debug("Kerberos principal name is " + fullName);
}
final String names[] = SaslUtil.splitKerberosName(fullName);
if (names.length != 3) {
throw new AccessDeniedException(
"Kerberos principal name does NOT have the expected "
+ "hostname part: " + fullName);
}
current.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws SaslException {
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
.getMechanismName(), names[0], names[1],
HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
return null;
}
});
}
if (saslServer == null)
throw new AccessDeniedException(
"Unable to find SASL server implementation for "
+ authMethod.getMechanismName());
if (LOG.isDebugEnabled()) {
LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.evaluateResponse()");
}
replyToken = saslServer
.evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
while (cause != null) {
if (cause instanceof InvalidToken) {
sendToClient = (InvalidToken) cause;
break;
}
cause = cause.getCause();
}
doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
sendToClient.getLocalizedMessage());
metrics.authenticationFailure();
String clientIP = this.toString();
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
throw e;
}
if (replyToken != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Will send token of size " + replyToken.length
+ " from saslServer.");
}
doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
null);
}
if (saslServer.isComplete()) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
if (LOG.isDebugEnabled()) {
LOG.debug("SASL server context established. Authenticated client: "
+ ugi + ". Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
}
metrics.authenticationSuccess();
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
saslContextEstablished = true;
}
}
}
private void processUnwrappedData(byte[] inBuf) throws IOException,
InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
// Read all RPCs contained in the inBuf, even partial ones
while (true) {
int count;
if (unwrappedDataLengthBuffer.remaining() > 0) {
count = channelRead(ch, unwrappedDataLengthBuffer);
if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
return;
}
if (unwrappedData == null) {
unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
if (LOG.isDebugEnabled())
LOG.debug("Received ping message");
unwrappedDataLengthBuffer.clear();
continue; // ping message
}
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
}
count = channelRead(ch, unwrappedData);
if (count <= 0 || unwrappedData.remaining() > 0)
return;
if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear();
unwrappedData.flip();
processOneRpc(new SingleByteBuff(unwrappedData));
unwrappedData = null;
}
}
}
public void processOneRpc(ByteBuff buf) throws IOException,
InterruptedException {
if (connectionHeaderRead) {
processRequest(buf);
} else {
processConnectionHeader(buf);
this.connectionHeaderRead = true;
if (!authorizeConnection()) {
// Throw FatalConnectionException wrapping ACE so client does right thing and closes
// down the connection instead of trying to read non-existent retun.
throw new AccessDeniedException("Connection from " + this + " for service " +
connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
}
this.user = userProvider.create(this.ugi);
}
}
protected boolean authorizeConnection() throws IOException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
if (ugi != null && ugi.getRealUser() != null
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
}
authorize(ugi, connectionHeader, getHostInetAddress());
metrics.authorizationSuccess();
} catch (AuthorizationException ae) {
if (LOG.isDebugEnabled()) {
LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
}
metrics.authorizationFailure();
setupResponse(authFailedResponse, authFailedCall,
new AccessDeniedException(ae), ae.getMessage());
authFailedCall.sendResponseIfReady();
return false;
}
return true;
}
// Reads the connection header following version
protected void processConnectionHeader(ByteBuff buf) throws IOException {
if (buf.hasArray()) {
this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
} else {
CodedInputStream cis = UnsafeByteOperations
.unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
cis.enableAliasing(true);
this.connectionHeader = ConnectionHeader.parseFrom(cis);
}
String serviceName = connectionHeader.getServiceName();
if (serviceName == null) throw new EmptyServiceNameException();
this.service = getService(services, serviceName);
if (this.service == null) throw new UnknownServiceException(serviceName);
setupCellBlockCodecs(this.connectionHeader);
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
RPCProtos.ConnectionHeaderResponse.newBuilder();
setupCryptoCipher(this.connectionHeader, chrBuilder);
responseConnectionHeader(chrBuilder);
UserGroupInformation protocolUser = createUser(connectionHeader);
if (!useSasl) {
ugi = protocolUser;
if (ugi != null) {
ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
}
// audit logging for SASL authenticated users happens in saslReadAndProcess()
if (authenticatedWithFallback) {
LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
+ " connecting from " + getHostAddress());
}
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
} else {
// user is authenticated
ugi.setAuthenticationMethod(authMethod.authenticationMethod);
//Now we check if this is a proxy user case. If the protocol user is
//different from the 'user', it is a proxy user scenario. However,
//this is not allowed if user authenticated with DIGEST.
if ((protocolUser != null)
&& (!protocolUser.getUserName().equals(ugi.getUserName()))) {
if (authMethod == AuthMethod.DIGEST) {
// Not allowed to doAs if token authentication is used
throw new AccessDeniedException("Authenticated user (" + ugi
+ ") doesn't match what the client claims to be ("
+ protocolUser + ")");
} else {
// Effective user can be different from authenticated user
// for simple auth or kerberos auth
// The user is the real user. Now we create a proxy user
UserGroupInformation realUser = ugi;
ugi = UserGroupInformation.createProxyUser(protocolUser
.getUserName(), realUser);
// Now the user is a proxy user, set Authentication method Proxy.
ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
}
}
}
if (connectionHeader.hasVersionInfo()) {
// see if this connection will support RetryImmediatelyException
retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with version info: "
+ TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
} else {
AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with unknown version info");
}
}
private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
throws FatalConnectionException {
// Response the connection header if Crypto AES is enabled
if (!chrBuilder.hasCryptoCipherMeta()) return;
try {
byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
// encrypt the Crypto AES cipher meta data with sasl server, and send to client
byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
} catch (IOException ex) {
throw new UnsupportedCryptoException(ex.getMessage(), ex);
}
}
/**
* Send the response for connection header
*/
private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
throws IOException {
ByteBufferOutputStream response = null;
DataOutputStream out = null;
try {
response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
out = new DataOutputStream(response);
out.writeInt(wrappedCipherMetaData.length);
out.write(wrappedCipherMetaData);
setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
.getByteBuffer());
setConnectionHeaderResponseCall.sendResponseIfReady();
} finally {
if (out != null) {
out.close();
}
if (response != null) {
response.close();
}
}
}
/**
* @param buf
* Has the request header and the request param and optionally
* encoded data buffer all in this one array.
* @throws IOException
* @throws InterruptedException
*/
protected void processRequest(ByteBuff buf) throws IOException,
InterruptedException {
long totalRequestSize = buf.limit();
int offset = 0;
// Here we read in the header. We avoid having pb
// do its default 4k allocation for CodedInputStream. We force it to use
// backing array.
CodedInputStream cis;
if (buf.hasArray()) {
cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit())
.newCodedInput();
} else {
cis = UnsafeByteOperations.unsafeWrap(
new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit())
.newCodedInput();
}
cis.enableAliasing(true);
int headerSize = cis.readRawVarint32();
offset = cis.getTotalBytesRead();
Message.Builder builder = RequestHeader.newBuilder();
ProtobufUtil.mergeFrom(builder, cis, headerSize);
RequestHeader header = (RequestHeader) builder.build();
offset += headerSize;
int id = header.getCallId();
if (LOG.isTraceEnabled()) {
LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
+ " totalRequestSize: " + totalRequestSize + " bytes");
}
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
final ServerCall callTooBig = createCall(id, this.service, null,
null, null, null, this, totalRequestSize, null, null, 0,
this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + server.getServerName()
+ ", is hbase.ipc.server.max.callqueue.size too small?");
callTooBig.sendResponseIfReady();
return;
}
MethodDescriptor md = null;
Message param = null;
CellScanner cellScanner = null;
try {
if (header.hasRequestParam() && header.getRequestParam()) {
md = this.service.getDescriptorForType().findMethodByName(
header.getMethodName());
if (md == null)
throw new UnsupportedOperationException(header.getMethodName());
builder = this.service.getRequestPrototype(md).newBuilderForType();
cis.resetSizeCounter();
int paramSize = cis.readRawVarint32();
offset += cis.getTotalBytesRead();
if (builder != null) {
ProtobufUtil.mergeFrom(builder, cis, paramSize);
param = builder.build();
}
offset += paramSize;
} else {
// currently header must have request param, so we directly throw
// exception here
String msg = "Invalid request header: "
+ TextFormat.shortDebugString(header)
+ ", should have param set in it";
LOG.warn(msg);
throw new DoNotRetryIOException(msg);
}
if (header.hasCellBlockMeta()) {
buf.position(offset);
ByteBuff dup = buf.duplicate();
dup.limit(offset + header.getCellBlockMeta().getLength());
cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(
this.codec, this.compressionCodec, dup);
}
} catch (Throwable t) {
InetSocketAddress address = getListenerAddress();
String msg = (address != null ? address : "(channel closed)")
+ " is unable to read call parameter from client "
+ getHostAddress();
LOG.warn(msg, t);
metrics.exception(t);
// probably the hbase hadoop version does not match the running hadoop
// version
if (t instanceof LinkageError) {
t = new DoNotRetryIOException(t);
}
// If the method is not present on the server, do not retry.
if (t instanceof UnsupportedOperationException) {
t = new DoNotRetryIOException(t);
}
final ServerCall readParamsFailedCall = createCall(id,
this.service, null, null, null, null, this, totalRequestSize, null,
null, 0, this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
readParamsFailedCall.sendResponseIfReady();
return;
}
TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header
.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null;
int timeout = 0;
if (header.hasTimeout() && header.getTimeout() > 0) {
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
}
ServerCall call = createCall(id, this.service, md, header, param,
cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout,
this.callCleanup);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSizeInBytes.add(-1 * call.getSize());
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + server.getServerName()
+ ", too many items queued ?");
call.sendResponseIfReady();
}
}
public abstract boolean isConnectionOpen();
public abstract ServerCall createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param,
CellScanner cellScanner, Connection connection, long size,
TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup);
}
/** /**
* Datastructure for passing a {@link BlockingService} and its associated class of * Datastructure for passing a {@link BlockingService} and its associated class of
* protobuf service interface. For example, a server that fielded what is defined * protobuf service interface. For example, a server that fielded what is defined
@ -1122,7 +348,7 @@ public abstract class RpcServer implements RpcServerInterface,
* @param error error message, if the call failed * @param error error message, if the call failed
* @throws IOException * @throws IOException
*/ */
protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t, protected void setupResponse(ByteArrayOutputStream response, ServerCall<?> call, Throwable t,
String error) throws IOException { String error) throws IOException {
if (response != null) response.reset(); if (response != null) response.reset();
call.setResponse(null, null, t, error); call.setResponse(null, null, t, error);
@ -1574,44 +800,4 @@ public abstract class RpcServer implements RpcServerInterface,
public void setRsRpcServices(RSRpcServices rsRpcServices) { public void setRsRpcServices(RSRpcServices rsRpcServices) {
this.rsRpcServices = rsRpcServices; this.rsRpcServices = rsRpcServices;
} }
protected static class ByteBuffByteInput extends ByteInput {
private ByteBuff buf;
private int offset;
private int length;
ByteBuffByteInput(ByteBuff buf, int offset, int length) {
this.buf = buf;
this.offset = offset;
this.length = length;
}
@Override
public byte read(int offset) {
return this.buf.get(getAbsoluteOffset(offset));
}
private int getAbsoluteOffset(int offset) {
return this.offset + offset;
}
@Override
public int read(int offset, byte[] out, int outOffset, int len) {
this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
return len;
}
@Override
public int read(int offset, ByteBuffer out) {
int len = out.remaining();
this.buf.get(out, getAbsoluteOffset(offset), len);
return len;
}
@Override
public int size() {
return this.length;
}
}
} }

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
@ -52,7 +51,7 @@ import org.apache.htrace.TraceInfo;
* the result. * the result.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
abstract class ServerCall implements RpcCall { abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
protected final int id; // the client's call id protected final int id; // the client's call id
protected final BlockingService service; protected final BlockingService service;
@ -61,7 +60,7 @@ abstract class ServerCall implements RpcCall {
protected Message param; // the parameter passed protected Message param; // the parameter passed
// Optional cell data passed outside of protobufs. // Optional cell data passed outside of protobufs.
protected final CellScanner cellScanner; protected final CellScanner cellScanner;
protected final Connection connection; // connection to client protected final T connection; // connection to client
protected final long receiveTime; // the time received when response is null protected final long receiveTime; // the time received when response is null
// the time served when response is not null // the time served when response is not null
protected final int timeout; protected final int timeout;
@ -96,7 +95,7 @@ abstract class ServerCall implements RpcCall {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below") justification="Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo, Message param, CellScanner cellScanner, T connection, long size, TraceInfo tinfo,
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
this.id = id; this.id = id;

View File

@ -0,0 +1,852 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.htrace.TraceInfo;
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT",
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
abstract class ServerRpcConnection implements Closeable {
/** */
protected final RpcServer rpcServer;
// If initial preamble with version and magic has been read or not.
protected boolean connectionPreambleRead = false;
// If the connection header has been read or not.
protected boolean connectionHeaderRead = false;
protected CallCleanup callCleanup;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
protected String hostAddress;
protected int remotePort;
protected InetAddress addr;
protected ConnectionHeader connectionHeader;
/**
* Codec the client asked use.
*/
protected Codec codec;
/**
* Compression codec the client asked us use.
*/
protected CompressionCodec compressionCodec;
protected BlockingService service;
protected AuthMethod authMethod;
protected boolean saslContextEstablished;
protected boolean skipInitialSaslHandshake;
private ByteBuffer unwrappedData;
// When is this set? FindBugs wants to know! Says NP
private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
protected boolean useSasl;
protected SaslServer saslServer;
protected CryptoAES cryptoAES;
protected boolean useWrap = false;
protected boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
protected static final int AUTHORIZATION_FAILED_CALLID = -1;
protected ServerCall<?> authFailedCall;
protected ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
protected static final int SASL_CALLID = -33;
protected ServerCall<?> saslCall;
// Fake 'call' for connection header response
protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
protected ServerCall<?> setConnectionHeaderResponseCall;
// was authentication allowed with a fallback to simple auth
protected boolean authenticatedWithFallback;
protected boolean retryImmediatelySupported = false;
private UserGroupInformation attemptingUser = null; // user name before auth
protected User user = null;
protected UserGroupInformation ugi = null;
public ServerRpcConnection(RpcServer rpcServer) {
this.rpcServer = rpcServer;
this.callCleanup = null;
}
@Override
public String toString() {
return getHostAddress() + ":" + remotePort;
}
public String getHostAddress() {
return hostAddress;
}
public InetAddress getHostInetAddress() {
return addr;
}
public int getRemotePort() {
return remotePort;
}
public VersionInfo getVersionInfo() {
if (connectionHeader.hasVersionInfo()) {
return connectionHeader.getVersionInfo();
}
return null;
}
protected String getFatalConnectionString(final int version, final byte authByte) {
return "serverVersion=" + RpcServer.CURRENT_VERSION +
", clientVersion=" + version + ", authMethod=" + authByte +
", authSupported=" + (authMethod != null) + " from " + toString();
}
protected UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException {
UserGroupInformation authorizedUgi;
if (authMethod == AuthMethod.DIGEST) {
TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
this.rpcServer.secretManager);
authorizedUgi = tokenId.getUser();
if (authorizedUgi == null) {
throw new AccessDeniedException(
"Can't retrieve username from tokenIdentifier.");
}
authorizedUgi.addTokenIdentifier(tokenId);
} else {
authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
}
authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
return authorizedUgi;
}
/**
* Set up cell block codecs
* @throws FatalConnectionException
*/
protected void setupCellBlockCodecs(final ConnectionHeader header)
throws FatalConnectionException {
// TODO: Plug in other supported decoders.
if (!header.hasCellBlockCodecClass()) return;
String className = header.getCellBlockCodecClass();
if (className == null || className.length() == 0) return;
try {
this.codec = (Codec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new UnsupportedCellCodecException(className, e);
}
if (!header.hasCellBlockCompressorClass()) return;
className = header.getCellBlockCompressorClass();
try {
this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new UnsupportedCompressionCodecException(className, e);
}
}
/**
* Set up cipher for rpc encryption with Apache Commons Crypto
*
* @throws FatalConnectionException
*/
protected void setupCryptoCipher(final ConnectionHeader header,
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
throws FatalConnectionException {
// If simple auth, return
if (saslServer == null) return;
// check if rpc encryption with Crypto AES
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
.getSaslQop().equalsIgnoreCase(qop);
boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean(
"hbase.rpc.crypto.encryption.aes.enabled", false);
if (!isCryptoAesEncryption) return;
if (!header.hasRpcCryptoCipherTransformation()) return;
String transformation = header.getRpcCryptoCipherTransformation();
if (transformation == null || transformation.length() == 0) return;
// Negotiates AES based on complete saslServer.
// The Crypto metadata need to be encrypted and send to client.
Properties properties = new Properties();
// the property for SecureRandomFactory
properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
"org.apache.commons.crypto.random.JavaCryptoRandom"));
// the property for cipher class
properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
"org.apache.commons.crypto.cipher.JceCipher"));
int cipherKeyBits = this.rpcServer.conf.getInt(
"hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
// generate key and iv
if (cipherKeyBits % 8 != 0) {
throw new IllegalArgumentException("The AES cipher key size in bits" +
" should be a multiple of byte");
}
int len = cipherKeyBits / 8;
byte[] inKey = new byte[len];
byte[] outKey = new byte[len];
byte[] inIv = new byte[len];
byte[] outIv = new byte[len];
try {
// generate the cipher meta data with SecureRandom
CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
secureRandom.nextBytes(inKey);
secureRandom.nextBytes(outKey);
secureRandom.nextBytes(inIv);
secureRandom.nextBytes(outIv);
// create CryptoAES for server
cryptoAES = new CryptoAES(transformation, properties,
inKey, outKey, inIv, outIv);
// create SaslCipherMeta and send to client,
// for client, the [inKey, outKey], [inIv, outIv] should be reversed
RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
ccmBuilder.setTransformation(transformation);
ccmBuilder.setInIv(getByteString(outIv));
ccmBuilder.setInKey(getByteString(outKey));
ccmBuilder.setOutIv(getByteString(inIv));
ccmBuilder.setOutKey(getByteString(inKey));
chrBuilder.setCryptoCipherMeta(ccmBuilder);
useCryptoAesWrap = true;
} catch (GeneralSecurityException | IOException ex) {
throw new UnsupportedCryptoException(ex.getMessage(), ex);
}
}
private ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation
return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
}
protected UserGroupInformation createUser(ConnectionHeader head) {
UserGroupInformation ugi = null;
if (!head.hasUserInfo()) {
return null;
}
UserInformation userInfoProto = head.getUserInfo();
String effectiveUser = null;
if (userInfoProto.hasEffectiveUser()) {
effectiveUser = userInfoProto.getEffectiveUser();
}
String realUser = null;
if (userInfoProto.hasRealUser()) {
realUser = userInfoProto.getRealUser();
}
if (effectiveUser != null) {
if (realUser != null) {
UserGroupInformation realUserUgi =
UserGroupInformation.createRemoteUser(realUser);
ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
} else {
ugi = UserGroupInformation.createRemoteUser(effectiveUser);
}
}
return ugi;
}
protected void disposeSasl() {
if (saslServer != null) {
try {
saslServer.dispose();
saslServer = null;
} catch (SaslException ignored) {
// Ignored. This is being disposed of anyway.
}
}
}
/**
* No protobuf encoding of raw sasl messages
*/
protected void doRawSaslReply(SaslStatus status, Writable rv,
String errorClass, String error) throws IOException {
ByteBufferOutputStream saslResponse = null;
DataOutputStream out = null;
try {
// In my testing, have noticed that sasl messages are usually
// in the ballpark of 100-200. That's why the initial capacity is 256.
saslResponse = new ByteBufferOutputStream(256);
out = new DataOutputStream(saslResponse);
out.writeInt(status.state); // write status
if (status == SaslStatus.SUCCESS) {
rv.write(out);
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
}
saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
saslCall.sendResponseIfReady();
} finally {
if (saslResponse != null) {
saslResponse.close();
}
if (out != null) {
out.close();
}
}
}
public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
InterruptedException {
if (saslContextEstablished) {
if (RpcServer.LOG.isTraceEnabled())
RpcServer.LOG.trace("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.unwrap()");
if (!useWrap) {
processOneRpc(saslToken);
} else {
byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
byte [] plaintextData;
if (useCryptoAesWrap) {
// unwrap with CryptoAES
plaintextData = cryptoAES.unwrap(b, 0, b.length);
} else {
plaintextData = saslServer.unwrap(b, 0, b.length);
}
processUnwrappedData(plaintextData);
}
} else {
byte[] replyToken;
try {
if (saslServer == null) {
switch (authMethod) {
case DIGEST:
if (this.rpcServer.secretManager == null) {
throw new AccessDeniedException(
"Server is not configured to do DIGEST authentication.");
}
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
.getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
this.rpcServer.secretManager, ugi -> attemptingUser = ugi));
break;
default:
UserGroupInformation current = UserGroupInformation.getCurrentUser();
String fullName = current.getUserName();
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("Kerberos principal name is " + fullName);
}
final String names[] = SaslUtil.splitKerberosName(fullName);
if (names.length != 3) {
throw new AccessDeniedException(
"Kerberos principal name does NOT have the expected "
+ "hostname part: " + fullName);
}
current.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws SaslException {
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
.getMechanismName(), names[0], names[1],
HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
return null;
}
});
}
if (saslServer == null)
throw new AccessDeniedException(
"Unable to find SASL server implementation for "
+ authMethod.getMechanismName());
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
}
}
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.evaluateResponse()");
}
replyToken = saslServer
.evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
while (cause != null) {
if (cause instanceof InvalidToken) {
sendToClient = (InvalidToken) cause;
break;
}
cause = cause.getCause();
}
doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
sendToClient.getLocalizedMessage());
this.rpcServer.metrics.authenticationFailure();
String clientIP = this.toString();
// attempting user could be null
RpcServer.AUDITLOG.warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
throw e;
}
if (replyToken != null) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("Will send token of size " + replyToken.length
+ " from saslServer.");
}
doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
null);
}
if (saslServer.isComplete()) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("SASL server context established. Authenticated client: "
+ ugi + ". Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
}
this.rpcServer.metrics.authenticationSuccess();
RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
saslContextEstablished = true;
}
}
}
private void processUnwrappedData(byte[] inBuf) throws IOException,
InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
// Read all RPCs contained in the inBuf, even partial ones
while (true) {
int count;
if (unwrappedDataLengthBuffer.remaining() > 0) {
count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
return;
}
if (unwrappedData == null) {
unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
if (RpcServer.LOG.isDebugEnabled())
RpcServer.LOG.debug("Received ping message");
unwrappedDataLengthBuffer.clear();
continue; // ping message
}
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
}
count = this.rpcServer.channelRead(ch, unwrappedData);
if (count <= 0 || unwrappedData.remaining() > 0)
return;
if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear();
unwrappedData.flip();
processOneRpc(new SingleByteBuff(unwrappedData));
unwrappedData = null;
}
}
}
public void processOneRpc(ByteBuff buf) throws IOException,
InterruptedException {
if (connectionHeaderRead) {
processRequest(buf);
} else {
processConnectionHeader(buf);
this.connectionHeaderRead = true;
if (!authorizeConnection()) {
// Throw FatalConnectionException wrapping ACE so client does right thing and closes
// down the connection instead of trying to read non-existent retun.
throw new AccessDeniedException("Connection from " + this + " for service " +
connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
}
this.user = this.rpcServer.userProvider.create(this.ugi);
}
}
protected boolean authorizeConnection() throws IOException {
try {
// If auth method is DIGEST, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
if (ugi != null && ugi.getRealUser() != null
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf);
}
this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress());
this.rpcServer.metrics.authorizationSuccess();
} catch (AuthorizationException ae) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
}
this.rpcServer.metrics.authorizationFailure();
this.rpcServer.setupResponse(authFailedResponse, authFailedCall,
new AccessDeniedException(ae), ae.getMessage());
authFailedCall.sendResponseIfReady();
return false;
}
return true;
}
// Reads the connection header following version
protected void processConnectionHeader(ByteBuff buf) throws IOException {
if (buf.hasArray()) {
this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
} else {
CodedInputStream cis = UnsafeByteOperations
.unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
cis.enableAliasing(true);
this.connectionHeader = ConnectionHeader.parseFrom(cis);
}
String serviceName = connectionHeader.getServiceName();
if (serviceName == null) throw new EmptyServiceNameException();
this.service = RpcServer.getService(this.rpcServer.services, serviceName);
if (this.service == null) throw new UnknownServiceException(serviceName);
setupCellBlockCodecs(this.connectionHeader);
RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
RPCProtos.ConnectionHeaderResponse.newBuilder();
setupCryptoCipher(this.connectionHeader, chrBuilder);
responseConnectionHeader(chrBuilder);
UserGroupInformation protocolUser = createUser(connectionHeader);
if (!useSasl) {
ugi = protocolUser;
if (ugi != null) {
ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
}
// audit logging for SASL authenticated users happens in saslReadAndProcess()
if (authenticatedWithFallback) {
RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
+ " connecting from " + getHostAddress());
}
RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
} else {
// user is authenticated
ugi.setAuthenticationMethod(authMethod.authenticationMethod);
//Now we check if this is a proxy user case. If the protocol user is
//different from the 'user', it is a proxy user scenario. However,
//this is not allowed if user authenticated with DIGEST.
if ((protocolUser != null)
&& (!protocolUser.getUserName().equals(ugi.getUserName()))) {
if (authMethod == AuthMethod.DIGEST) {
// Not allowed to doAs if token authentication is used
throw new AccessDeniedException("Authenticated user (" + ugi
+ ") doesn't match what the client claims to be ("
+ protocolUser + ")");
} else {
// Effective user can be different from authenticated user
// for simple auth or kerberos auth
// The user is the real user. Now we create a proxy user
UserGroupInformation realUser = ugi;
ugi = UserGroupInformation.createProxyUser(protocolUser
.getUserName(), realUser);
// Now the user is a proxy user, set Authentication method Proxy.
ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
}
}
}
if (connectionHeader.hasVersionInfo()) {
// see if this connection will support RetryImmediatelyException
retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
RpcServer.AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with version info: "
+ TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
} else {
RpcServer.AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with unknown version info");
}
}
private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
throws FatalConnectionException {
// Response the connection header if Crypto AES is enabled
if (!chrBuilder.hasCryptoCipherMeta()) return;
try {
byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
// encrypt the Crypto AES cipher meta data with sasl server, and send to client
byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
} catch (IOException ex) {
throw new UnsupportedCryptoException(ex.getMessage(), ex);
}
}
/**
* Send the response for connection header
*/
private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
throws IOException {
ByteBufferOutputStream response = null;
DataOutputStream out = null;
try {
response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
out = new DataOutputStream(response);
out.writeInt(wrappedCipherMetaData.length);
out.write(wrappedCipherMetaData);
setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
.getByteBuffer());
setConnectionHeaderResponseCall.sendResponseIfReady();
} finally {
if (out != null) {
out.close();
}
if (response != null) {
response.close();
}
}
}
/**
* @param buf
* Has the request header and the request param and optionally
* encoded data buffer all in this one array.
* @throws IOException
* @throws InterruptedException
*/
protected void processRequest(ByteBuff buf) throws IOException,
InterruptedException {
long totalRequestSize = buf.limit();
int offset = 0;
// Here we read in the header. We avoid having pb
// do its default 4k allocation for CodedInputStream. We force it to use
// backing array.
CodedInputStream cis;
if (buf.hasArray()) {
cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
} else {
cis = UnsafeByteOperations
.unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
}
cis.enableAliasing(true);
int headerSize = cis.readRawVarint32();
offset = cis.getTotalBytesRead();
Message.Builder builder = RequestHeader.newBuilder();
ProtobufUtil.mergeFrom(builder, cis, headerSize);
RequestHeader header = (RequestHeader) builder.build();
offset += headerSize;
int id = header.getCallId();
if (RpcServer.LOG.isTraceEnabled()) {
RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
+ " totalRequestSize: " + totalRequestSize + " bytes");
}
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, null, 0, this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
this.rpcServer.setupResponse(responseBuffer, callTooBig, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName()
+ ", is hbase.ipc.server.max.callqueue.size too small?");
callTooBig.sendResponseIfReady();
return;
}
MethodDescriptor md = null;
Message param = null;
CellScanner cellScanner = null;
try {
if (header.hasRequestParam() && header.getRequestParam()) {
md = this.service.getDescriptorForType().findMethodByName(
header.getMethodName());
if (md == null)
throw new UnsupportedOperationException(header.getMethodName());
builder = this.service.getRequestPrototype(md).newBuilderForType();
cis.resetSizeCounter();
int paramSize = cis.readRawVarint32();
offset += cis.getTotalBytesRead();
if (builder != null) {
ProtobufUtil.mergeFrom(builder, cis, paramSize);
param = builder.build();
}
offset += paramSize;
} else {
// currently header must have request param, so we directly throw
// exception here
String msg = "Invalid request header: "
+ TextFormat.shortDebugString(header)
+ ", should have param set in it";
RpcServer.LOG.warn(msg);
throw new DoNotRetryIOException(msg);
}
if (header.hasCellBlockMeta()) {
buf.position(offset);
ByteBuff dup = buf.duplicate();
dup.limit(offset + header.getCellBlockMeta().getLength());
cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(
this.codec, this.compressionCodec, dup);
}
} catch (Throwable t) {
InetSocketAddress address = this.rpcServer.getListenerAddress();
String msg = (address != null ? address : "(channel closed)")
+ " is unable to read call parameter from client "
+ getHostAddress();
RpcServer.LOG.warn(msg, t);
this.rpcServer.metrics.exception(t);
// probably the hbase hadoop version does not match the running hadoop
// version
if (t instanceof LinkageError) {
t = new DoNotRetryIOException(t);
}
// If the method is not present on the server, do not retry.
if (t instanceof UnsupportedOperationException) {
t = new DoNotRetryIOException(t);
}
final ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, null, 0, this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
this.rpcServer.setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
readParamsFailedCall.sendResponseIfReady();
return;
}
TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header
.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null;
int timeout = 0;
if (header.hasTimeout() && header.getTimeout() > 0) {
timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
}
ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize,
traceInfo, this.addr, timeout, this.callCleanup);
if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
this.rpcServer.setupResponse(responseBuffer, call, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName()
+ ", too many items queued ?");
call.sendResponseIfReady();
}
}
public abstract boolean isConnectionOpen();
public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner, long size, TraceInfo tinfo,
InetAddress remoteAddress, int timeout, CallCleanup reqCleanup);
private static class ByteBuffByteInput extends ByteInput {
private ByteBuff buf;
private int offset;
private int length;
ByteBuffByteInput(ByteBuff buf, int offset, int length) {
this.buf = buf;
this.offset = offset;
this.length = length;
}
@Override
public byte read(int offset) {
return this.buf.get(getAbsoluteOffset(offset));
}
private int getAbsoluteOffset(int offset) {
return this.offset + offset;
}
@Override
public int read(int offset, byte[] out, int outOffset, int len) {
this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
return len;
}
@Override
public int read(int offset, ByteBuffer out) {
int len = out.remaining();
this.buf.get(out, getAbsoluteOffset(offset), len);
return len;
}
@Override
public int size() {
return this.length;
}
}
}

View File

@ -15,29 +15,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.BindException; import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel; import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -45,47 +36,26 @@ import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -125,7 +95,7 @@ public class SimpleRpcServer extends RpcServer {
// maintains the set of client connections and handles idle timeouts // maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager; private ConnectionManager connectionManager;
private Listener listener = null; private Listener listener = null;
protected Responder responder = null; protected SimpleRpcServerResponder responder = null;
/** Listens on the socket. Creates jobs for the handler threads*/ /** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread { private class Listener extends Thread {
@ -178,7 +148,7 @@ public class SimpleRpcServer extends RpcServer {
private class Reader implements Runnable { private class Reader implements Runnable {
final private LinkedBlockingQueue<Connection> pendingConnections; final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
private final Selector readSelector; private final Selector readSelector;
Reader() throws IOException { Reader() throws IOException {
@ -206,7 +176,7 @@ public class SimpleRpcServer extends RpcServer {
// unbridled acceptance of connections that starves the select // unbridled acceptance of connections that starves the select
int size = pendingConnections.size(); int size = pendingConnections.size();
for (int i=size; i>0; i--) { for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take(); SimpleServerRpcConnection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn); conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
} }
readSelector.select(); readSelector.select();
@ -238,7 +208,7 @@ public class SimpleRpcServer extends RpcServer {
* so the connection must be queued. The reader will drain the queue * so the connection must be queued. The reader will drain the queue
* and update its readSelector before performing the next select * and update its readSelector before performing the next select
*/ */
public void addConnection(Connection conn) throws IOException { public void addConnection(SimpleServerRpcConnection conn) throws IOException {
pendingConnections.add(conn); pendingConnections.add(conn);
readSelector.wakeup(); readSelector.wakeup();
} }
@ -314,7 +284,7 @@ public class SimpleRpcServer extends RpcServer {
private void closeCurrentConnection(SelectionKey key, Throwable e) { private void closeCurrentConnection(SelectionKey key, Throwable e) {
if (key != null) { if (key != null) {
Connection c = (Connection)key.attachment(); SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment();
if (c != null) { if (c != null) {
closeConnection(c); closeConnection(c);
key.attach(null); key.attach(null);
@ -334,7 +304,7 @@ public class SimpleRpcServer extends RpcServer {
channel.socket().setTcpNoDelay(tcpNoDelay); channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(tcpKeepAlive); channel.socket().setKeepAlive(tcpKeepAlive);
Reader reader = getReader(); Reader reader = getReader();
Connection c = connectionManager.register(channel); SimpleServerRpcConnection c = connectionManager.register(channel);
// If the connectionManager can't take it, close the connection. // If the connectionManager can't take it, close the connection.
if (c == null) { if (c == null) {
if (channel.isOpen()) { if (channel.isOpen()) {
@ -349,7 +319,7 @@ public class SimpleRpcServer extends RpcServer {
void doRead(SelectionKey key) throws InterruptedException { void doRead(SelectionKey key) throws InterruptedException {
int count; int count;
Connection c = (Connection) key.attachment(); SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
if (c == null) { if (c == null) {
return; return;
} }
@ -396,649 +366,6 @@ public class SimpleRpcServer extends RpcServer {
} }
} }
// Sends responses of RPC back to clients.
protected class Responder extends Thread {
private final Selector writeSelector;
private final Set<Connection> writingCons =
Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
Responder() throws IOException {
this.setName("RpcServer.responder");
this.setDaemon(true);
this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
writeSelector = Selector.open(); // create a selector
}
@Override
public void run() {
LOG.debug(getName() + ": starting");
try {
doRunLoop();
} finally {
LOG.info(getName() + ": stopping");
try {
writeSelector.close();
} catch (IOException ioe) {
LOG.error(getName() + ": couldn't close write selector", ioe);
}
}
}
/**
* Take the list of the connections that want to write, and register them
* in the selector.
*/
private void registerWrites() {
Iterator<Connection> it = writingCons.iterator();
while (it.hasNext()) {
Connection c = it.next();
it.remove();
SelectionKey sk = c.channel.keyFor(writeSelector);
try {
if (sk == null) {
try {
c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
} catch (ClosedChannelException e) {
// ignore: the client went away.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
}
} else {
sk.interestOps(SelectionKey.OP_WRITE);
}
} catch (CancelledKeyException e) {
// ignore: the client went away.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
}
}
}
/**
* Add a connection to the list that want to write,
*/
public void registerForWrite(Connection c) {
if (writingCons.add(c)) {
writeSelector.wakeup();
}
}
private void doRunLoop() {
long lastPurgeTime = 0; // last check for old calls.
while (running) {
try {
registerWrites();
int keyCt = writeSelector.select(purgeTimeout);
if (keyCt == 0) {
continue;
}
Set<SelectionKey> keys = writeSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e) {
LOG.debug(getName() + ": asyncWrite", e);
}
}
lastPurgeTime = purge(lastPurgeTime);
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
LOG.info(getName() + ": exiting on OutOfMemoryError");
return;
}
} else {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
LOG.warn(getName() + ": OutOfMemoryError in server select", e);
try {
Thread.sleep(60000);
} catch (InterruptedException ex) {
LOG.debug("Interrupted while sleeping");
return;
}
}
} catch (Exception e) {
LOG.warn(getName() + ": exception in Responder " +
StringUtils.stringifyException(e), e);
}
}
LOG.info(getName() + ": stopped");
}
/**
* If there were some calls that have not been sent out for a
* long time, we close the connection.
* @return the time of the purge.
*/
private long purge(long lastPurgeTime) {
long now = System.currentTimeMillis();
if (now < lastPurgeTime + purgeTimeout) {
return lastPurgeTime;
}
ArrayList<Connection> conWithOldCalls = new ArrayList<>();
// get the list of channels from list of keys.
synchronized (writeSelector.keys()) {
for (SelectionKey key : writeSelector.keys()) {
Connection connection = (Connection) key.attachment();
if (connection == null) {
throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
}
SimpleServerCall call = connection.responseQueue.peekFirst();
if (call != null && now > call.lastSentTime + purgeTimeout) {
conWithOldCalls.add(call.getConnection());
}
}
}
// Seems safer to close the connection outside of the synchronized loop...
for (Connection connection : conWithOldCalls) {
closeConnection(connection);
}
return now;
}
private void doAsyncWrite(SelectionKey key) throws IOException {
Connection connection = (Connection) key.attachment();
if (connection == null) {
throw new IOException("doAsyncWrite: no connection");
}
if (key.channel() != connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}
if (processAllResponses(connection)) {
try {
// We wrote everything, so we don't need to be told when the socket is ready for
// write anymore.
key.interestOps(0);
} catch (CancelledKeyException e) {
/* The Listener/reader might have closed the socket.
* We don't explicitly cancel the key, so not sure if this will
* ever fire.
* This warning could be removed.
*/
LOG.warn("Exception while changing ops : " + e);
}
}
}
/**
* Process the response for this call. You need to have the lock on
* {@link org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection#responseWriteLock}
*
* @param call the call
* @return true if we proceed the call fully, false otherwise.
* @throws IOException
*/
private boolean processResponse(final SimpleServerCall call) throws IOException {
boolean error = true;
try {
// Send as much data as we can in the non-blocking fashion
long numBytes = channelWrite(call.getConnection().channel,
call.response);
if (numBytes < 0) {
throw new HBaseIOException("Error writing on the socket " +
"for the call:" + call.toShortString());
}
error = false;
} finally {
if (error) {
LOG.debug(getName() + call.toShortString() + ": output error -- closing");
// We will be closing this connection itself. Mark this call as done so that all the
// buffer(s) it got from pool can get released
call.done();
closeConnection(call.getConnection());
}
}
if (!call.response.hasRemaining()) {
call.done();
return true;
} else {
return false; // Socket can't take more, we will have to come back.
}
}
/**
* Process all the responses for this connection
*
* @return true if all the calls were processed or that someone else is doing it.
* false if there * is still some work to do. In this case, we expect the caller to
* delay us.
* @throws IOException
*/
private boolean processAllResponses(final Connection connection) throws IOException {
// We want only one writer on the channel for a connection at a time.
connection.responseWriteLock.lock();
try {
for (int i = 0; i < 20; i++) {
// protection if some handlers manage to need all the responder
SimpleServerCall call = connection.responseQueue.pollFirst();
if (call == null) {
return true;
}
if (!processResponse(call)) {
connection.responseQueue.addFirst(call);
return false;
}
}
} finally {
connection.responseWriteLock.unlock();
}
return connection.responseQueue.isEmpty();
}
//
// Enqueue a response from the application.
//
void doRespond(SimpleServerCall call) throws IOException {
boolean added = false;
// If there is already a write in progress, we don't wait. This allows to free the handlers
// immediately for other tasks.
if (call.getConnection().responseQueue.isEmpty()
&& call.getConnection().responseWriteLock.tryLock()) {
try {
if (call.getConnection().responseQueue.isEmpty()) {
// If we're alone, we can try to do a direct call to the socket. It's
// an optimisation to save on context switches and data transfer between cores..
if (processResponse(call)) {
return; // we're done.
}
// Too big to fit, putting ahead.
call.getConnection().responseQueue.addFirst(call);
added = true; // We will register to the selector later, outside of the lock.
}
} finally {
call.getConnection().responseWriteLock.unlock();
}
}
if (!added) {
call.getConnection().responseQueue.addLast(call);
}
call.responder.registerForWrite(call.getConnection());
// set the serve time when the response has to be sent later
call.lastSentTime = System.currentTimeMillis();
}
}
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT",
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
public class Connection extends RpcServer.Connection {
protected SocketChannel channel;
private ByteBuff data;
private ByteBuffer dataLengthBuffer;
protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue = new ConcurrentLinkedDeque<>();
private final Lock responseWriteLock = new ReentrantLock();
private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
private long lastContact;
protected Socket socket;
public Connection(SocketChannel channel, long lastContact) {
super();
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();
if (socketSendBufferSize != 0) {
try {
socket.setSendBufferSize(socketSendBufferSize);
} catch (IOException e) {
LOG.warn("Connection: unable to set socket send buffer size to " +
socketSendBufferSize);
}
}
this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
reservoir, cellBlockBuilder, null, responder);
this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null,
null, null, this, 0, null, null, System.currentTimeMillis(), 0, reservoir,
cellBlockBuilder, null, responder);
}
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
}
public long getLastContact() {
return lastContact;
}
/* Return true if the connection has no outstanding rpc */
private boolean isIdle() {
return rpcCount.sum() == 0;
}
/* Decrement the outstanding RPC count */
protected void decRpcCount() {
rpcCount.decrement();
}
/* Increment the outstanding RPC count */
protected void incRpcCount() {
rpcCount.increment();
}
private int readPreamble() throws IOException {
int count;
// Check for 'HBas' magic.
this.dataLengthBuffer.flip();
if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
return doBadPreambleHandling("Expected HEADER=" +
Bytes.toStringBinary(HConstants.RPC_HEADER) +
" but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
" from " + toString());
}
// Now read the next two bytes, the version and the auth to use.
ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
count = channelRead(channel, versionAndAuthBytes);
if (count < 0 || versionAndAuthBytes.remaining() > 0) {
return count;
}
int version = versionAndAuthBytes.get(0);
byte authbyte = versionAndAuthBytes.get(1);
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != CURRENT_VERSION) {
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new WrongVersionException(msg));
}
if (authMethod == null) {
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new BadAuthException(msg));
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
if (allowFallbackToSimpleAuth) {
metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
authFailedCall.sendResponseIfReady();
throw ae;
}
}
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
dataLengthBuffer.clear();
connectionPreambleRead = true;
return count;
}
private int read4Bytes() throws IOException {
if (this.dataLengthBuffer.remaining() > 0) {
return channelRead(channel, this.dataLengthBuffer);
} else {
return 0;
}
}
/**
* Read off the wire. If there is not enough data to read, update the connection state with
* what we have and returns.
* @return Returns -1 if failure (and caller will close connection), else zero or more.
* @throws IOException
* @throws InterruptedException
*/
public int readAndProcess() throws IOException, InterruptedException {
// Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it
// does, read in the rest of the connection preamble, the version and the auth method.
// Else it will be length of the data to read (or -1 if a ping). We catch the integer
// length into the 4-byte this.dataLengthBuffer.
int count = read4Bytes();
if (count < 0 || dataLengthBuffer.remaining() > 0) {
return count;
}
// If we have not read the connection setup preamble, look to see if that is on the wire.
if (!connectionPreambleRead) {
count = readPreamble();
if (!connectionPreambleRead) {
return count;
}
count = read4Bytes();
if (count < 0 || dataLengthBuffer.remaining() > 0) {
return count;
}
}
// We have read a length and we have read the preamble. It is either the connection header
// or it is a request.
if (data == null) {
dataLengthBuffer.flip();
int dataLength = dataLengthBuffer.getInt();
if (dataLength == RpcClient.PING_CALL_ID) {
if (!useWrap) { //covers the !useSasl too
dataLengthBuffer.clear();
return 0; //ping message
}
}
if (dataLength < 0) { // A data length of zero is legal.
throw new DoNotRetryIOException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
if (dataLength > maxRequestSize) {
String msg = "RPC data length of " + dataLength + " received from "
+ getHostAddress() + " is greater than max allowed "
+ maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
+ "\" on server to override this limit (not recommended)";
LOG.warn(msg);
if (connectionHeaderRead && connectionPreambleRead) {
incRpcCount();
// Construct InputStream for the non-blocking SocketChannel
// We need the InputStream because we want to read only the request header
// instead of the whole rpc.
ByteBuffer buf = ByteBuffer.allocate(1);
InputStream is = new InputStream() {
@Override
public int read() throws IOException {
channelRead(channel, buf);
buf.flip();
int x = buf.get();
buf.flip();
return x;
}
};
CodedInputStream cis = CodedInputStream.newInstance(is);
int headerSize = cis.readRawVarint32();
Message.Builder builder = RequestHeader.newBuilder();
ProtobufUtil.mergeFrom(builder, cis, headerSize);
RequestHeader header = (RequestHeader) builder.build();
// Notify the client about the offending request
SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service,
null, null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0,
reservoir, cellBlockBuilder, null, responder);
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
} else {
setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
}
// We are going to close the connection, make sure we process the response
// before that. In rare case when this fails, we still close the connection.
responseWriteLock.lock();
responder.processResponse(reqTooBig);
responseWriteLock.unlock();
}
// Close the connection
return -1;
}
// Initialize this.data with a ByteBuff.
// This call will allocate a ByteBuff to read request into and assign to this.data
// Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
// assign to this.callCleanup
initByteBuffToReadInto(dataLength);
// Increment the rpc count. This counter will be decreased when we write
// the response. If we want the connection to be detected as idle properly, we
// need to keep the inc / dec correct.
incRpcCount();
}
count = channelDataRead(channel, data);
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
process();
}
return count;
}
// It creates the ByteBuff and CallCleanup and assign to Connection instance.
private void initByteBuffToReadInto(int length) {
// We create random on heap buffers are read into those when
// 1. ByteBufferPool is not there.
// 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
// waste then. Also if all the reqs are of this size, we will be creating larger sized
// buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
// RegionOpen.
// 3. If it is an initial handshake signal or initial connection request. Any way then
// condition 2 itself will match
// 4. When SASL use is ON.
if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl
|| length < minSizeForReservoirUse) {
this.data = new SingleByteBuff(ByteBuffer.allocate(length));
} else {
Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(reservoir,
minSizeForReservoirUse, length);
this.data = pair.getFirst();
this.callCleanup = pair.getSecond();
}
}
protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
int count = buf.read(channel);
if (count > 0) {
metrics.receivedBytes(count);
}
return count;
}
/**
* Process the data buffer and clean the connection state for the next call.
*/
private void process() throws IOException, InterruptedException {
data.rewind();
try {
if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false;
return;
}
if (useSasl) {
saslReadAndProcess(data);
} else {
processOneRpc(data);
}
} finally {
dataLengthBuffer.clear(); // Clean for the next call
data = null; // For the GC
this.callCleanup = null;
}
}
private int doBadPreambleHandling(final String msg) throws IOException {
return doBadPreambleHandling(msg, new FatalConnectionException(msg));
}
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1,
null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
return -1;
}
@Override
public synchronized void close() {
disposeSasl();
data = null;
callCleanup = null;
if (!channel.isOpen())
return;
try {socket.shutdownOutput();} catch(Exception ignored) {
if (LOG.isTraceEnabled()) {
LOG.trace("Ignored exception", ignored);
}
}
if (channel.isOpen()) {
try {channel.close();} catch(Exception ignored) {}
}
try {
socket.close();
} catch(Exception ignored) {
if (LOG.isTraceEnabled()) {
LOG.trace("Ignored exception", ignored);
}
}
}
@Override
public boolean isConnectionOpen() {
return channel.isOpen();
}
@Override
public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
RpcServer.Connection connection, long size, TraceInfo tinfo,
final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
return new SimpleServerCall(id, service, md, header, param, cellScanner, connection, size,
tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
reqCleanup, responder);
}
}
/** /**
* Constructs a server listening on the named port and address. * Constructs a server listening on the named port and address.
* @param server hosting instance of {@link Server}. We will do authentications if an * @param server hosting instance of {@link Server}. We will do authentications if an
@ -1065,7 +392,7 @@ public class SimpleRpcServer extends RpcServer {
this.port = listener.getAddress().getPort(); this.port = listener.getAddress().getPort();
// Create the responder here // Create the responder here
responder = new Responder(); responder = new SimpleRpcServerResponder(this);
connectionManager = new ConnectionManager(); connectionManager = new ConnectionManager();
initReconfigurable(conf); initReconfigurable(conf);
@ -1076,11 +403,11 @@ public class SimpleRpcServer extends RpcServer {
* Subclasses of HBaseServer can override this to provide their own * Subclasses of HBaseServer can override this to provide their own
* Connection implementations. * Connection implementations.
*/ */
protected Connection getConnection(SocketChannel channel, long time) { protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new Connection(channel, time); return new SimpleServerRpcConnection(this, channel, time);
} }
protected void closeConnection(Connection connection) { protected void closeConnection(SimpleServerRpcConnection connection) {
connectionManager.close(connection); connectionManager.close(connection);
} }
@ -1228,7 +555,7 @@ public class SimpleRpcServer extends RpcServer {
private class ConnectionManager { private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger(); final private AtomicInteger count = new AtomicInteger();
final private Set<Connection> connections; final private Set<SimpleServerRpcConnection> connections;
final private Timer idleScanTimer; final private Timer idleScanTimer;
final private int idleScanThreshold; final private int idleScanThreshold;
@ -1250,11 +577,11 @@ public class SimpleRpcServer extends RpcServer {
// create a set with concurrency -and- a thread-safe iterator, add 2 // create a set with concurrency -and- a thread-safe iterator, add 2
// for listener and idle closer threads // for listener and idle closer threads
this.connections = Collections.newSetFromMap( this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<Connection,Boolean>( new ConcurrentHashMap<SimpleServerRpcConnection,Boolean>(
maxConnectionQueueSize, 0.75f, readThreads+2)); maxConnectionQueueSize, 0.75f, readThreads+2));
} }
private boolean add(Connection connection) { private boolean add(SimpleServerRpcConnection connection) {
boolean added = connections.add(connection); boolean added = connections.add(connection);
if (added) { if (added) {
count.getAndIncrement(); count.getAndIncrement();
@ -1262,7 +589,7 @@ public class SimpleRpcServer extends RpcServer {
return added; return added;
} }
private boolean remove(Connection connection) { private boolean remove(SimpleServerRpcConnection connection) {
boolean removed = connections.remove(connection); boolean removed = connections.remove(connection);
if (removed) { if (removed) {
count.getAndDecrement(); count.getAndDecrement();
@ -1274,12 +601,12 @@ public class SimpleRpcServer extends RpcServer {
return count.get(); return count.get();
} }
Connection[] toArray() { SimpleServerRpcConnection[] toArray() {
return connections.toArray(new Connection[0]); return connections.toArray(new SimpleServerRpcConnection[0]);
} }
Connection register(SocketChannel channel) { SimpleServerRpcConnection register(SocketChannel channel) {
Connection connection = getConnection(channel, System.currentTimeMillis()); SimpleServerRpcConnection connection = getConnection(channel, System.currentTimeMillis());
add(connection); add(connection);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection + LOG.debug("Server connection from " + connection +
@ -1291,7 +618,7 @@ public class SimpleRpcServer extends RpcServer {
return connection; return connection;
} }
boolean close(Connection connection) { boolean close(SimpleServerRpcConnection connection) {
boolean exists = remove(connection); boolean exists = remove(connection);
if (exists) { if (exists) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -1314,7 +641,7 @@ public class SimpleRpcServer extends RpcServer {
// during the iteration, but that's ok because they won't // during the iteration, but that's ok because they won't
// be idle yet anyway and will be caught on next scan // be idle yet anyway and will be caught on next scan
int closed = 0; int closed = 0;
for (Connection connection : connections) { for (SimpleServerRpcConnection connection : connections) {
// stop if connections dropped below threshold unless scanning all // stop if connections dropped below threshold unless scanning all
if (!scanAll && size() < idleScanThreshold) { if (!scanAll && size() < idleScanThreshold) {
break; break;
@ -1332,7 +659,7 @@ public class SimpleRpcServer extends RpcServer {
void closeAll() { void closeAll() {
// use a copy of the connections to be absolutely sure the concurrent // use a copy of the connections to be absolutely sure the concurrent
// iterator doesn't miss a connection // iterator doesn't miss a connection
for (Connection connection : toArray()) { for (SimpleServerRpcConnection connection : toArray()) {
close(connection); close(connection);
} }
} }

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
/**
* Sends responses of RPC back to clients.
*/
@InterfaceAudience.Private
class SimpleRpcServerResponder extends Thread {
/** */
private final SimpleRpcServer simpleRpcServer;
private final Selector writeSelector;
private final Set<SimpleServerRpcConnection> writingCons =
Collections.newSetFromMap(new ConcurrentHashMap<SimpleServerRpcConnection, Boolean>());
SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException {
this.simpleRpcServer = simpleRpcServer;
this.setName("RpcServer.responder");
this.setDaemon(true);
this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
writeSelector = Selector.open(); // create a selector
}
@Override
public void run() {
SimpleRpcServer.LOG.debug(getName() + ": starting");
try {
doRunLoop();
} finally {
SimpleRpcServer.LOG.info(getName() + ": stopping");
try {
writeSelector.close();
} catch (IOException ioe) {
SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe);
}
}
}
/**
* Take the list of the connections that want to write, and register them in the selector.
*/
private void registerWrites() {
Iterator<SimpleServerRpcConnection> it = writingCons.iterator();
while (it.hasNext()) {
SimpleServerRpcConnection c = it.next();
it.remove();
SelectionKey sk = c.channel.keyFor(writeSelector);
try {
if (sk == null) {
try {
c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
} catch (ClosedChannelException e) {
// ignore: the client went away.
if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
}
} else {
sk.interestOps(SelectionKey.OP_WRITE);
}
} catch (CancelledKeyException e) {
// ignore: the client went away.
if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e);
}
}
}
/**
* Add a connection to the list that want to write,
*/
public void registerForWrite(SimpleServerRpcConnection c) {
if (writingCons.add(c)) {
writeSelector.wakeup();
}
}
private void doRunLoop() {
long lastPurgeTime = 0; // last check for old calls.
while (this.simpleRpcServer.running) {
try {
registerWrites();
int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout);
if (keyCt == 0) {
continue;
}
Set<SelectionKey> keys = writeSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e) {
SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e);
}
}
lastPurgeTime = purge(lastPurgeTime);
} catch (OutOfMemoryError e) {
if (this.simpleRpcServer.errorHandler != null) {
if (this.simpleRpcServer.errorHandler.checkOOME(e)) {
SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError");
return;
}
} else {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e);
try {
Thread.sleep(60000);
} catch (InterruptedException ex) {
SimpleRpcServer.LOG.debug("Interrupted while sleeping");
return;
}
}
} catch (Exception e) {
SimpleRpcServer.LOG
.warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e);
}
}
SimpleRpcServer.LOG.info(getName() + ": stopped");
}
/**
* If there were some calls that have not been sent out for a long time, we close the connection.
* @return the time of the purge.
*/
private long purge(long lastPurgeTime) {
long now = System.currentTimeMillis();
if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) {
return lastPurgeTime;
}
ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>();
// get the list of channels from list of keys.
synchronized (writeSelector.keys()) {
for (SelectionKey key : writeSelector.keys()) {
SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
if (connection == null) {
throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
}
SimpleServerCall call = connection.responseQueue.peekFirst();
if (call != null && now > call.lastSentTime + this.simpleRpcServer.purgeTimeout) {
conWithOldCalls.add(call.getConnection());
}
}
}
// Seems safer to close the connection outside of the synchronized loop...
for (SimpleServerRpcConnection connection : conWithOldCalls) {
this.simpleRpcServer.closeConnection(connection);
}
return now;
}
private void doAsyncWrite(SelectionKey key) throws IOException {
SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment();
if (connection == null) {
throw new IOException("doAsyncWrite: no connection");
}
if (key.channel() != connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}
if (processAllResponses(connection)) {
try {
// We wrote everything, so we don't need to be told when the socket is ready for
// write anymore.
key.interestOps(0);
} catch (CancelledKeyException e) {
/*
* The Listener/reader might have closed the socket. We don't explicitly cancel the key, so
* not sure if this will ever fire. This warning could be removed.
*/
SimpleRpcServer.LOG.warn("Exception while changing ops : " + e);
}
}
}
/**
* Process the response for this call. You need to have the lock on
* {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
* @param call the call
* @return true if we proceed the call fully, false otherwise.
* @throws IOException
*/
boolean processResponse(final SimpleServerCall call) throws IOException {
boolean error = true;
try {
// Send as much data as we can in the non-blocking fashion
long numBytes =
this.simpleRpcServer.channelWrite(call.getConnection().channel, call.response);
if (numBytes < 0) {
throw new HBaseIOException(
"Error writing on the socket " + "for the call:" + call.toShortString());
}
error = false;
} finally {
if (error) {
SimpleRpcServer.LOG.debug(getName() + call.toShortString() + ": output error -- closing");
// We will be closing this connection itself. Mark this call as done so that all the
// buffer(s) it got from pool can get released
call.done();
this.simpleRpcServer.closeConnection(call.getConnection());
}
}
if (!call.response.hasRemaining()) {
call.done();
return true;
} else {
return false; // Socket can't take more, we will have to come back.
}
}
/**
* Process all the responses for this connection
* @return true if all the calls were processed or that someone else is doing it. false if there *
* is still some work to do. In this case, we expect the caller to delay us.
* @throws IOException
*/
private boolean processAllResponses(final SimpleServerRpcConnection connection)
throws IOException {
// We want only one writer on the channel for a connection at a time.
connection.responseWriteLock.lock();
try {
for (int i = 0; i < 20; i++) {
// protection if some handlers manage to need all the responder
SimpleServerCall call = connection.responseQueue.pollFirst();
if (call == null) {
return true;
}
if (!processResponse(call)) {
connection.responseQueue.addFirst(call);
return false;
}
}
} finally {
connection.responseWriteLock.unlock();
}
return connection.responseQueue.isEmpty();
}
//
// Enqueue a response from the application.
//
void doRespond(SimpleServerCall call) throws IOException {
boolean added = false;
// If there is already a write in progress, we don't wait. This allows to free the handlers
// immediately for other tasks.
if (call.getConnection().responseQueue.isEmpty() &&
call.getConnection().responseWriteLock.tryLock()) {
try {
if (call.getConnection().responseQueue.isEmpty()) {
// If we're alone, we can try to do a direct call to the socket. It's
// an optimisation to save on context switches and data transfer between cores..
if (processResponse(call)) {
return; // we're done.
}
// Too big to fit, putting ahead.
call.getConnection().responseQueue.addFirst(call);
added = true; // We will register to the selector later, outside of the lock.
}
} finally {
call.getConnection().responseWriteLock.unlock();
}
}
if (!added) {
call.getConnection().responseQueue.addLast(call);
}
call.responder.registerForWrite(call.getConnection());
// set the serve time when the response has to be sent later
call.lastSentTime = System.currentTimeMillis();
}
}

View File

@ -24,8 +24,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Responder;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
@ -37,19 +35,19 @@ import org.apache.htrace.TraceInfo;
* result. * result.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class SimpleServerCall extends ServerCall { class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
long lastSentTime; long lastSentTime;
final Responder responder; final SimpleRpcServerResponder responder;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below") justification = "Can't figure why this complaint is happening... see below")
SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md, SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection, RequestHeader header, Message param, CellScanner cellScanner,
long size, TraceInfo tinfo, final InetAddress remoteAddress, long receiveTime, int timeout, SimpleServerRpcConnection connection, long size, TraceInfo tinfo,
ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
Responder responder) { CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) {
super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress, super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
this.responder = responder; this.responder = responder;
@ -73,7 +71,7 @@ class SimpleServerCall extends ServerCall {
this.responder.doRespond(this); this.responder.doRespond(this);
} }
Connection getConnection() { SimpleServerRpcConnection getConnection() {
return (Connection) this.connection; return (SimpleServerRpcConnection) this.connection;
} }
} }

View File

@ -0,0 +1,428 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.IntWritable;
import org.apache.htrace.TraceInfo;
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
@InterfaceAudience.Private
class SimpleServerRpcConnection extends ServerRpcConnection {
final SocketChannel channel;
private ByteBuff data;
private ByteBuffer dataLengthBuffer;
protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue =
new ConcurrentLinkedDeque<>();
final Lock responseWriteLock = new ReentrantLock();
private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
private long lastContact;
private final Socket socket;
private final SimpleRpcServerResponder responder;
public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel,
long lastContact) {
super(rpcServer);
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();
if (rpcServer.socketSendBufferSize != 0) {
try {
socket.setSendBufferSize(rpcServer.socketSendBufferSize);
} catch (IOException e) {
SimpleRpcServer.LOG.warn(
"Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize);
}
}
this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null,
rpcServer.responder);
this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
rpcServer.reservoir, rpcServer.cellBlockBuilder, null, rpcServer.responder);
this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null,
null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir,
rpcServer.cellBlockBuilder, null, rpcServer.responder);
this.responder = rpcServer.responder;
}
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
}
public long getLastContact() {
return lastContact;
}
/* Return true if the connection has no outstanding rpc */
boolean isIdle() {
return rpcCount.sum() == 0;
}
/* Decrement the outstanding RPC count */
protected void decRpcCount() {
rpcCount.decrement();
}
/* Increment the outstanding RPC count */
protected void incRpcCount() {
rpcCount.increment();
}
private int readPreamble() throws IOException {
int count;
// Check for 'HBas' magic.
this.dataLengthBuffer.flip();
if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
return doBadPreambleHandling(
"Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
Bytes.toStringBinary(dataLengthBuffer.array()) + " from " + toString());
}
// Now read the next two bytes, the version and the auth to use.
ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
count = this.rpcServer.channelRead(channel, versionAndAuthBytes);
if (count < 0 || versionAndAuthBytes.remaining() > 0) {
return count;
}
int version = versionAndAuthBytes.get(0);
byte authbyte = versionAndAuthBytes.get(1);
this.authMethod = AuthMethod.valueOf(authbyte);
if (version != SimpleRpcServer.CURRENT_VERSION) {
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new WrongVersionException(msg));
}
if (authMethod == null) {
String msg = getFatalConnectionString(version, authbyte);
return doBadPreambleHandling(msg, new BadAuthException(msg));
}
if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
if (this.rpcServer.allowFallbackToSimpleAuth) {
this.rpcServer.metrics.authenticationFallback();
authenticatedWithFallback = true;
} else {
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
authFailedCall.sendResponseIfReady();
throw ae;
}
}
if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
dataLengthBuffer.clear();
connectionPreambleRead = true;
return count;
}
private int read4Bytes() throws IOException {
if (this.dataLengthBuffer.remaining() > 0) {
return this.rpcServer.channelRead(channel, this.dataLengthBuffer);
} else {
return 0;
}
}
/**
* Read off the wire. If there is not enough data to read, update the connection state with what
* we have and returns.
* @return Returns -1 if failure (and caller will close connection), else zero or more.
* @throws IOException
* @throws InterruptedException
*/
public int readAndProcess() throws IOException, InterruptedException {
// Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it
// does, read in the rest of the connection preamble, the version and the auth method.
// Else it will be length of the data to read (or -1 if a ping). We catch the integer
// length into the 4-byte this.dataLengthBuffer.
int count = read4Bytes();
if (count < 0 || dataLengthBuffer.remaining() > 0) {
return count;
}
// If we have not read the connection setup preamble, look to see if that is on the wire.
if (!connectionPreambleRead) {
count = readPreamble();
if (!connectionPreambleRead) {
return count;
}
count = read4Bytes();
if (count < 0 || dataLengthBuffer.remaining() > 0) {
return count;
}
}
// We have read a length and we have read the preamble. It is either the connection header
// or it is a request.
if (data == null) {
dataLengthBuffer.flip();
int dataLength = dataLengthBuffer.getInt();
if (dataLength == RpcClient.PING_CALL_ID) {
if (!useWrap) { // covers the !useSasl too
dataLengthBuffer.clear();
return 0; // ping message
}
}
if (dataLength < 0) { // A data length of zero is legal.
throw new DoNotRetryIOException(
"Unexpected data length " + dataLength + "!! from " + getHostAddress());
}
if (dataLength > this.rpcServer.maxRequestSize) {
String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() +
" is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" +
SimpleRpcServer.MAX_REQUEST_SIZE +
"\" on server to override this limit (not recommended)";
SimpleRpcServer.LOG.warn(msg);
if (connectionHeaderRead && connectionPreambleRead) {
incRpcCount();
// Construct InputStream for the non-blocking SocketChannel
// We need the InputStream because we want to read only the request header
// instead of the whole rpc.
ByteBuffer buf = ByteBuffer.allocate(1);
InputStream is = new InputStream() {
@Override
public int read() throws IOException {
SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf);
buf.flip();
int x = buf.get();
buf.flip();
return x;
}
};
CodedInputStream cis = CodedInputStream.newInstance(is);
int headerSize = cis.readRawVarint32();
Message.Builder builder = RequestHeader.newBuilder();
ProtobufUtil.mergeFrom(builder, cis, headerSize);
RequestHeader header = (RequestHeader) builder.build();
// Notify the client about the offending request
SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0,
this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder);
this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
this.rpcServer.setupResponse(null, reqTooBig, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION,
msg);
} else {
this.rpcServer.setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
}
// We are going to close the connection, make sure we process the response
// before that. In rare case when this fails, we still close the connection.
responseWriteLock.lock();
try {
this.responder.processResponse(reqTooBig);
} finally {
responseWriteLock.unlock();
}
}
// Close the connection
return -1;
}
// Initialize this.data with a ByteBuff.
// This call will allocate a ByteBuff to read request into and assign to this.data
// Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
// assign to this.callCleanup
initByteBuffToReadInto(dataLength);
// Increment the rpc count. This counter will be decreased when we write
// the response. If we want the connection to be detected as idle properly, we
// need to keep the inc / dec correct.
incRpcCount();
}
count = channelDataRead(channel, data);
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
process();
}
return count;
}
// It creates the ByteBuff and CallCleanup and assign to Connection instance.
private void initByteBuffToReadInto(int length) {
// We create random on heap buffers are read into those when
// 1. ByteBufferPool is not there.
// 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
// waste then. Also if all the reqs are of this size, we will be creating larger sized
// buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
// RegionOpen.
// 3. If it is an initial handshake signal or initial connection request. Any way then
// condition 2 itself will match
// 4. When SASL use is ON.
if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead ||
useSasl || length < this.rpcServer.minSizeForReservoirUse) {
this.data = new SingleByteBuff(ByteBuffer.allocate(length));
} else {
Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length);
this.data = pair.getFirst();
this.callCleanup = pair.getSecond();
}
}
protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
int count = buf.read(channel);
if (count > 0) {
this.rpcServer.metrics.receivedBytes(count);
}
return count;
}
/**
* Process the data buffer and clean the connection state for the next call.
*/
private void process() throws IOException, InterruptedException {
data.rewind();
try {
if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false;
return;
}
if (useSasl) {
saslReadAndProcess(data);
} else {
processOneRpc(data);
}
} finally {
dataLengthBuffer.clear(); // Clean for the next call
data = null; // For the GC
this.callCleanup = null;
}
}
private int doBadPreambleHandling(final String msg) throws IOException {
return doBadPreambleHandling(msg, new FatalConnectionException(msg));
}
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
SimpleRpcServer.LOG.warn(msg);
SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1,
null, null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, null, responder);
this.rpcServer.setupResponse(null, fakeCall, e, msg);
this.responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
return -1;
}
@Override
public synchronized void close() {
disposeSasl();
data = null;
callCleanup = null;
if (!channel.isOpen()) return;
try {
socket.shutdownOutput();
} catch (Exception ignored) {
if (SimpleRpcServer.LOG.isTraceEnabled()) {
SimpleRpcServer.LOG.trace("Ignored exception", ignored);
}
}
if (channel.isOpen()) {
try {
channel.close();
} catch (Exception ignored) {
}
}
try {
socket.close();
} catch (Exception ignored) {
if (SimpleRpcServer.LOG.isTraceEnabled()) {
SimpleRpcServer.LOG.trace("Ignored exception", ignored);
}
}
}
@Override
public boolean isConnectionOpen() {
return channel.isOpen();
}
@Override
public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner, long size, TraceInfo tinfo,
InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, tinfo,
remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
}
}

View File

@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer;
import javax.security.auth.callback.Callback; import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.CallbackHandler;
@ -34,14 +35,13 @@ import javax.security.sasl.RealmCallback;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
/** /**
* A utility class for dealing with SASL on RPC server * A utility class for dealing with SASL on RPC server
@ -79,13 +79,12 @@ public class HBaseSaslRpcServer {
/** CallbackHandler for SASL DIGEST-MD5 mechanism */ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
public static class SaslDigestCallbackHandler implements CallbackHandler { public static class SaslDigestCallbackHandler implements CallbackHandler {
private SecretManager<TokenIdentifier> secretManager; private SecretManager<TokenIdentifier> secretManager;
private RpcServer.Connection connection; private Consumer<UserGroupInformation> attemptingUserConsumer;
public SaslDigestCallbackHandler( public SaslDigestCallbackHandler(SecretManager<TokenIdentifier> secretManager,
SecretManager<TokenIdentifier> secretManager, Consumer<UserGroupInformation> attemptingUserConsumer) {
RpcServer.Connection connection) {
this.secretManager = secretManager; this.secretManager = secretManager;
this.connection = connection; this.attemptingUserConsumer = attemptingUserConsumer;
} }
private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken { private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
@ -116,12 +115,11 @@ public class HBaseSaslRpcServer {
if (pc != null) { if (pc != null) {
TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager); TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
char[] password = getPassword(tokenIdentifier); char[] password = getPassword(tokenIdentifier);
UserGroupInformation user = null; UserGroupInformation user = tokenIdentifier.getUser(); // may throw exception
user = tokenIdentifier.getUser(); // may throw exception attemptingUserConsumer.accept(user);
connection.attemptingUser = user;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("SASL server DIGEST-MD5 callback: setting password " LOG.trace("SASL server DIGEST-MD5 callback: setting password " + "for client: " +
+ "for client: " + tokenIdentifier.getUser()); tokenIdentifier.getUser());
} }
pc.setPassword(password); pc.setPassword(password);
} }

View File

@ -315,9 +315,10 @@ public abstract class AbstractTestIPC {
new InetSocketAddress("localhost", 0), conf, scheduler); new InetSocketAddress("localhost", 0), conf, scheduler);
} }
class FailingConnection extends Connection { class FailingConnection extends SimpleServerRpcConnection {
public FailingConnection(SocketChannel channel, long lastContact) { public FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel,
super(channel, lastContact); long lastContact) {
super(rpcServer, channel, lastContact);
} }
@Override @Override
@ -329,8 +330,8 @@ public abstract class AbstractTestIPC {
} }
@Override @Override
protected Connection getConnection(SocketChannel channel, long time) { protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new FailingConnection(channel, time); return new FailingConnection(this, channel, time);
} }
} }