HBASE-24506 async client deadlock (#1858)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
467efa573c
commit
108e23630a
|
@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
import org.apache.hadoop.hbase.HBaseIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -37,10 +38,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
|
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
|
@ -234,4 +239,35 @@ class IPCUtil {
|
||||||
+ (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
|
+ (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
|
||||||
+ call.timeout));
|
+ call.timeout));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MutableInt initialValue() throws Exception {
|
||||||
|
return new MutableInt(0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int MAX_DEPTH = 4;
|
||||||
|
|
||||||
|
static void execute(EventLoop eventLoop, Runnable action) {
|
||||||
|
if (eventLoop.inEventLoop()) {
|
||||||
|
// this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel
|
||||||
|
// implementation.
|
||||||
|
MutableInt depth = DEPTH.get();
|
||||||
|
if (depth.intValue() < MAX_DEPTH) {
|
||||||
|
depth.increment();
|
||||||
|
try {
|
||||||
|
action.run();
|
||||||
|
} finally {
|
||||||
|
depth.decrement();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
eventLoop.execute(action);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
eventLoop.execute(action);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,27 @@ package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
|
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
|
||||||
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
|
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
|
||||||
|
import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
|
||||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
|
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
|
||||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
|
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
|
||||||
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||||
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
|
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
||||||
|
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
||||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
|
||||||
|
@ -36,35 +50,25 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
|
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
|
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
|
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
|
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
|
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
|
||||||
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
|
||||||
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RPC connection implementation based on netty.
|
* RPC connection implementation based on netty.
|
||||||
* <p>
|
* <p/>
|
||||||
* Most operations are executed in handlers. Netty handler is always executed in the same
|
* Most operations are executed in handlers. Netty handler is always executed in the same
|
||||||
* thread(EventLoop) so no lock is needed.
|
* thread(EventLoop) so no lock is needed.
|
||||||
|
* <p/>
|
||||||
|
* <strong>Implementation assumptions:</strong> All the private methods should be called in the
|
||||||
|
* {@link #eventLoop} thread, otherwise there will be races.
|
||||||
* @since 2.0.0
|
* @since 2.0.0
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -73,25 +77,30 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
|
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
|
||||||
|
|
||||||
private static final ScheduledExecutorService RELOGIN_EXECUTOR =
|
private static final ScheduledExecutorService RELOGIN_EXECUTOR =
|
||||||
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
|
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
|
||||||
|
|
||||||
private final NettyRpcClient rpcClient;
|
private final NettyRpcClient rpcClient;
|
||||||
|
|
||||||
|
// the event loop used to set up the connection, we will also execute other operations for this
|
||||||
|
// connection in this event loop, to avoid locking everywhere.
|
||||||
|
private final EventLoop eventLoop;
|
||||||
|
|
||||||
private ByteBuf connectionHeaderPreamble;
|
private ByteBuf connectionHeaderPreamble;
|
||||||
|
|
||||||
private ByteBuf connectionHeaderWithLength;
|
private ByteBuf connectionHeaderWithLength;
|
||||||
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
|
// make it volatile so in the isActive method below we do not need to switch to the event loop
|
||||||
justification = "connect is also under lock as notifyOnCancel will call our action directly")
|
// thread to access this field.
|
||||||
private Channel channel;
|
private volatile Channel channel;
|
||||||
|
|
||||||
NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
|
NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
|
||||||
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
|
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
|
||||||
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
|
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
|
||||||
this.rpcClient = rpcClient;
|
this.rpcClient = rpcClient;
|
||||||
|
this.eventLoop = rpcClient.group.next();
|
||||||
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
|
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
|
||||||
this.connectionHeaderPreamble =
|
this.connectionHeaderPreamble =
|
||||||
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
|
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
|
||||||
ConnectionHeader header = getConnectionHeader();
|
ConnectionHeader header = getConnectionHeader();
|
||||||
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
|
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
|
||||||
this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
|
this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
|
||||||
|
@ -99,18 +108,21 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void callTimeout(Call call) {
|
protected void callTimeout(Call call) {
|
||||||
if (channel != null) {
|
execute(eventLoop, () -> {
|
||||||
channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
|
if (channel != null) {
|
||||||
}
|
channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean isActive() {
|
public boolean isActive() {
|
||||||
return channel != null;
|
return channel != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void shutdown0() {
|
private void shutdown0() {
|
||||||
|
assert eventLoop.inEventLoop();
|
||||||
if (channel != null) {
|
if (channel != null) {
|
||||||
channel.close();
|
channel.close();
|
||||||
channel = null;
|
channel = null;
|
||||||
|
@ -118,21 +130,26 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void shutdown() {
|
public void shutdown() {
|
||||||
shutdown0();
|
execute(eventLoop, this::shutdown0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void cleanupConnection() {
|
public void cleanupConnection() {
|
||||||
if (connectionHeaderPreamble != null) {
|
execute(eventLoop, () -> {
|
||||||
ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
|
if (connectionHeaderPreamble != null) {
|
||||||
}
|
ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
|
||||||
if (connectionHeaderWithLength != null) {
|
connectionHeaderPreamble = null;
|
||||||
ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
|
}
|
||||||
}
|
if (connectionHeaderWithLength != null) {
|
||||||
|
ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
|
||||||
|
connectionHeaderWithLength = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void established(Channel ch) throws IOException {
|
private void established(Channel ch) throws IOException {
|
||||||
|
assert eventLoop.inEventLoop();
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
|
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
|
||||||
p.addBefore(addBeforeHandler, null,
|
p.addBefore(addBeforeHandler, null,
|
||||||
|
@ -146,43 +163,39 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
private boolean reloginInProgress;
|
private boolean reloginInProgress;
|
||||||
|
|
||||||
private void scheduleRelogin(Throwable error) {
|
private void scheduleRelogin(Throwable error) {
|
||||||
|
assert eventLoop.inEventLoop();
|
||||||
if (error instanceof FallbackDisallowedException) {
|
if (error instanceof FallbackDisallowedException) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
synchronized (this) {
|
if (!provider.canRetry()) {
|
||||||
if (reloginInProgress) {
|
LOG.trace("SASL Provider does not support retries");
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
reloginInProgress = true;
|
|
||||||
RELOGIN_EXECUTOR.schedule(new Runnable() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
if (provider.canRetry()) {
|
|
||||||
provider.relogin();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Relogin failed", e);
|
|
||||||
}
|
|
||||||
synchronized (this) {
|
|
||||||
reloginInProgress = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
|
if (reloginInProgress) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
reloginInProgress = true;
|
||||||
|
RELOGIN_EXECUTOR.schedule(() -> {
|
||||||
|
try {
|
||||||
|
provider.relogin();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Relogin failed", e);
|
||||||
|
}
|
||||||
|
eventLoop.execute(() -> {
|
||||||
|
reloginInProgress = false;
|
||||||
|
});
|
||||||
|
}, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void failInit(Channel ch, IOException e) {
|
private void failInit(Channel ch, IOException e) {
|
||||||
synchronized (this) {
|
assert eventLoop.inEventLoop();
|
||||||
// fail all pending calls
|
// fail all pending calls
|
||||||
ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
|
ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
|
||||||
shutdown0();
|
shutdown0();
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void saslNegotiate(final Channel ch) {
|
private void saslNegotiate(final Channel ch) {
|
||||||
|
assert eventLoop.inEventLoop();
|
||||||
UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
|
UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
|
||||||
if (ticket == null) {
|
if (ticket == null) {
|
||||||
failInit(ch, new FatalConnectionException("ticket/user is null"));
|
failInit(ch, new FatalConnectionException("ticket/user is null"));
|
||||||
|
@ -192,7 +205,7 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
final NettyHBaseSaslRpcClientHandler saslHandler;
|
final NettyHBaseSaslRpcClientHandler saslHandler;
|
||||||
try {
|
try {
|
||||||
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
|
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
|
||||||
serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
|
serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
failInit(ch, e);
|
failInit(ch, e);
|
||||||
return;
|
return;
|
||||||
|
@ -212,7 +225,7 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
|
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
|
||||||
// create the handler to handle the connection header
|
// create the handler to handle the connection header
|
||||||
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
|
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
|
||||||
connectionHeaderPromise, conf, connectionHeaderWithLength);
|
connectionHeaderPromise, conf, connectionHeaderWithLength);
|
||||||
|
|
||||||
// add ReadTimeoutHandler to deal with server doesn't response connection header
|
// add ReadTimeoutHandler to deal with server doesn't response connection header
|
||||||
// because of the different configuration in client side and server side
|
// because of the different configuration in client side and server side
|
||||||
|
@ -251,52 +264,38 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void connect() {
|
private void connect() {
|
||||||
|
assert eventLoop.inEventLoop();
|
||||||
LOG.trace("Connecting to {}", remoteId.address);
|
LOG.trace("Connecting to {}", remoteId.address);
|
||||||
|
|
||||||
this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass)
|
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
|
||||||
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
|
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
|
||||||
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
|
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
|
||||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
|
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
|
||||||
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
|
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
|
||||||
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
|
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
Channel ch = future.channel();
|
Channel ch = future.channel();
|
||||||
if (!future.isSuccess()) {
|
if (!future.isSuccess()) {
|
||||||
failInit(ch, toIOE(future.cause()));
|
failInit(ch, toIOE(future.cause()));
|
||||||
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
|
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
||||||
if (useSasl) {
|
if (useSasl) {
|
||||||
saslNegotiate(ch);
|
saslNegotiate(ch);
|
||||||
} else {
|
} else {
|
||||||
// send the connection header to server
|
// send the connection header to server
|
||||||
ch.write(connectionHeaderWithLength.retainedDuplicate());
|
ch.write(connectionHeaderWithLength.retainedDuplicate());
|
||||||
established(ch);
|
established(ch);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}).channel();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void write(Channel ch, final Call call) {
|
|
||||||
ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
|
||||||
// Fail the call if we failed to write it out. This usually because the channel is
|
|
||||||
// closed. This is needed because we may shutdown the channel inside event loop and
|
|
||||||
// there may still be some pending calls in the event loop queue after us.
|
|
||||||
if (!future.isSuccess()) {
|
|
||||||
call.setException(toIOE(future.cause()));
|
|
||||||
}
|
}
|
||||||
}
|
}).channel();
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
|
||||||
public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
|
assert eventLoop.inEventLoop();
|
||||||
if (reloginInProgress) {
|
if (reloginInProgress) {
|
||||||
throw new IOException("Can not send request because relogin is in progress.");
|
throw new IOException("Can not send request because relogin is in progress.");
|
||||||
}
|
}
|
||||||
|
@ -305,10 +304,8 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
@Override
|
@Override
|
||||||
public void run(Object parameter) {
|
public void run(Object parameter) {
|
||||||
setCancelled(call);
|
setCancelled(call);
|
||||||
synchronized (this) {
|
if (channel != null) {
|
||||||
if (channel != null) {
|
channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
|
||||||
channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, new CancellationCallback() {
|
}, new CancellationCallback() {
|
||||||
|
@ -322,31 +319,31 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
connect();
|
connect();
|
||||||
}
|
}
|
||||||
scheduleTimeoutTask(call);
|
scheduleTimeoutTask(call);
|
||||||
final Channel ch = channel;
|
channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
|
||||||
// We must move the whole writeAndFlush call inside event loop otherwise there will be a
|
|
||||||
// race condition.
|
|
||||||
// In netty's DefaultChannelPipeline, it will find the first outbound handler in the
|
|
||||||
// current thread and then schedule a task to event loop which will start the process from
|
|
||||||
// that outbound handler. It is possible that the first handler is
|
|
||||||
// BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
|
|
||||||
// up at the same time so in the event loop thread we remove the
|
|
||||||
// BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
|
|
||||||
// write method of BufferCallBeforeInitHandler.
|
|
||||||
// This may be considered as a bug of netty, but anyway there is a work around so let's
|
|
||||||
// fix it by ourselves first.
|
|
||||||
if (ch.eventLoop().inEventLoop()) {
|
|
||||||
write(ch, call);
|
|
||||||
} else {
|
|
||||||
ch.eventLoop().execute(new Runnable() {
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
write(ch, call);
|
// Fail the call if we failed to write it out. This usually because the channel is
|
||||||
|
// closed. This is needed because we may shutdown the channel inside event loop and
|
||||||
|
// there may still be some pending calls in the event loop queue after us.
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
call.setException(toIOE(future.cause()));
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendRequest(final Call call, HBaseRpcController hrc) {
|
||||||
|
execute(eventLoop, () -> {
|
||||||
|
try {
|
||||||
|
sendRequest0(call, hrc);
|
||||||
|
} catch (Exception e) {
|
||||||
|
call.setException(toIOE(e));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,23 +18,30 @@
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.DefaultEventLoop;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||||
|
|
||||||
@Category({ ClientTests.class, SmallTests.class })
|
@Category({ ClientTests.class, SmallTests.class })
|
||||||
public class TestIPCUtil {
|
public class TestIPCUtil {
|
||||||
|
|
||||||
|
@ -43,7 +50,7 @@ public class TestIPCUtil {
|
||||||
HBaseClassTestRule.forClass(TestIPCUtil.class);
|
HBaseClassTestRule.forClass(TestIPCUtil.class);
|
||||||
|
|
||||||
private static Throwable create(Class<? extends Throwable> clazz) throws InstantiationException,
|
private static Throwable create(Class<? extends Throwable> clazz) throws InstantiationException,
|
||||||
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
|
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
|
||||||
try {
|
try {
|
||||||
Constructor<? extends Throwable> c = clazz.getDeclaredConstructor();
|
Constructor<? extends Throwable> c = clazz.getDeclaredConstructor();
|
||||||
c.setAccessible(true);
|
c.setAccessible(true);
|
||||||
|
@ -102,4 +109,44 @@ public class TestIPCUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExecute() throws IOException {
|
||||||
|
EventLoop eventLoop = new DefaultEventLoop();
|
||||||
|
MutableInt executed = new MutableInt(0);
|
||||||
|
MutableInt numStackTraceElements = new MutableInt(0);
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
try {
|
||||||
|
IPCUtil.execute(eventLoop, new Runnable() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
int numElements = new Exception().getStackTrace().length;
|
||||||
|
int depth = executed.getAndIncrement();
|
||||||
|
if (depth <= IPCUtil.MAX_DEPTH) {
|
||||||
|
if (numElements <= numStackTraceElements.intValue()) {
|
||||||
|
future.completeExceptionally(
|
||||||
|
new AssertionError("should call run directly but stack trace decreased from " +
|
||||||
|
numStackTraceElements.intValue() + " to " + numElements));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
numStackTraceElements.setValue(numElements);
|
||||||
|
IPCUtil.execute(eventLoop, this);
|
||||||
|
} else {
|
||||||
|
if (numElements >= numStackTraceElements.intValue()) {
|
||||||
|
future.completeExceptionally(
|
||||||
|
new AssertionError("should call eventLoop.execute to prevent stack overflow but" +
|
||||||
|
" stack trace increased from " + numStackTraceElements.intValue() + " to " +
|
||||||
|
numElements));
|
||||||
|
} else {
|
||||||
|
future.complete(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
FutureUtils.get(future);
|
||||||
|
} finally {
|
||||||
|
eventLoop.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Modifier;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
|
@Category({ ClientTests.class, SmallTests.class })
|
||||||
|
public class TestNettyRpcConnection {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestNettyRpcConnection.class);
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestNettyRpcConnection.class);
|
||||||
|
|
||||||
|
private static NettyRpcClient CLIENT;
|
||||||
|
|
||||||
|
private static NettyRpcConnection CONN;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws IOException {
|
||||||
|
CLIENT = new NettyRpcClient(HBaseConfiguration.create());
|
||||||
|
CONN = new NettyRpcConnection(CLIENT,
|
||||||
|
new ConnectionId(User.getCurrent(), "test", new InetSocketAddress("localhost", 1234)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws IOException {
|
||||||
|
Closeables.close(CLIENT, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrivateMethodExecutedInEventLoop() throws IllegalAccessException {
|
||||||
|
// make sure the test is executed with "-ea"
|
||||||
|
assertThrows(AssertionError.class, () -> {
|
||||||
|
assert false;
|
||||||
|
});
|
||||||
|
for (Method method : NettyRpcConnection.class.getDeclaredMethods()) {
|
||||||
|
if (Modifier.isPrivate(method.getModifiers()) && !method.getName().contains("$")) {
|
||||||
|
LOG.info("checking {}", method);
|
||||||
|
method.setAccessible(true);
|
||||||
|
// all private methods should be called inside the event loop thread, so calling it from
|
||||||
|
// this thread will cause the "assert eventLoop.inEventLoop();" to fail
|
||||||
|
try {
|
||||||
|
// now there is no primitive parameters for the private methods so let's pass null
|
||||||
|
method.invoke(CONN, new Object[method.getParameterCount()]);
|
||||||
|
fail("should fail with AssertionError");
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
assertThat(e.getCause(), instanceOf(AssertionError.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
||||||
|
@ -58,6 +57,8 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Will split the table, and move region randomly when testing.
|
* Will split the table, and move region randomly when testing.
|
||||||
*/
|
*/
|
||||||
|
@ -116,7 +117,7 @@ public class TestAsyncTableGetMultiThreaded {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
IOUtils.closeQuietly(CONN);
|
Closeables.close(CONN, true);
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue