HBASE-24506 async client deadlock (#1858)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
022dd9687f
commit
754ac1d06d
|
@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
|
|||
import java.net.SocketTimeoutException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.commons.lang3.mutable.MutableInt;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -37,10 +38,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||
|
@ -234,4 +239,35 @@ class IPCUtil {
|
|||
+ (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
|
||||
+ call.timeout));
|
||||
}
|
||||
|
||||
private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() {
|
||||
|
||||
@Override
|
||||
protected MutableInt initialValue() throws Exception {
|
||||
return new MutableInt(0);
|
||||
}
|
||||
};
|
||||
|
||||
@VisibleForTesting
|
||||
static final int MAX_DEPTH = 4;
|
||||
|
||||
static void execute(EventLoop eventLoop, Runnable action) {
|
||||
if (eventLoop.inEventLoop()) {
|
||||
// this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel
|
||||
// implementation.
|
||||
MutableInt depth = DEPTH.get();
|
||||
if (depth.intValue() < MAX_DEPTH) {
|
||||
depth.increment();
|
||||
try {
|
||||
action.run();
|
||||
} finally {
|
||||
depth.decrement();
|
||||
}
|
||||
} else {
|
||||
eventLoop.execute(action);
|
||||
}
|
||||
} else {
|
||||
eventLoop.execute(action);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,13 +19,27 @@ package org.apache.hadoop.hbase.ipc;
|
|||
|
||||
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
|
||||
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
||||
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
|
||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
|
||||
|
@ -36,35 +50,25 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
|
|||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
|
||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
|
||||
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
||||
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* RPC connection implementation based on netty.
|
||||
* <p>
|
||||
* <p/>
|
||||
* Most operations are executed in handlers. Netty handler is always executed in the same
|
||||
* thread(EventLoop) so no lock is needed.
|
||||
* <p/>
|
||||
* <strong>Implementation assumptions:</strong> All the private methods should be called in the
|
||||
* {@link #eventLoop} thread, otherwise there will be races.
|
||||
* @since 2.0.0
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
|
@ -73,25 +77,30 @@ class NettyRpcConnection extends RpcConnection {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
|
||||
|
||||
private static final ScheduledExecutorService RELOGIN_EXECUTOR =
|
||||
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
|
||||
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
|
||||
|
||||
private final NettyRpcClient rpcClient;
|
||||
|
||||
// the event loop used to set up the connection, we will also execute other operations for this
|
||||
// connection in this event loop, to avoid locking everywhere.
|
||||
private final EventLoop eventLoop;
|
||||
|
||||
private ByteBuf connectionHeaderPreamble;
|
||||
|
||||
private ByteBuf connectionHeaderWithLength;
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
|
||||
justification = "connect is also under lock as notifyOnCancel will call our action directly")
|
||||
private Channel channel;
|
||||
// make it volatile so in the isActive method below we do not need to switch to the event loop
|
||||
// thread to access this field.
|
||||
private volatile Channel channel;
|
||||
|
||||
NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
|
||||
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
|
||||
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
|
||||
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
|
||||
this.rpcClient = rpcClient;
|
||||
this.eventLoop = rpcClient.group.next();
|
||||
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
|
||||
this.connectionHeaderPreamble =
|
||||
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
|
||||
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
|
||||
ConnectionHeader header = getConnectionHeader();
|
||||
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
|
||||
this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
|
||||
|
@ -99,18 +108,21 @@ class NettyRpcConnection extends RpcConnection {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void callTimeout(Call call) {
|
||||
if (channel != null) {
|
||||
channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
|
||||
}
|
||||
protected void callTimeout(Call call) {
|
||||
execute(eventLoop, () -> {
|
||||
if (channel != null) {
|
||||
channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isActive() {
|
||||
public boolean isActive() {
|
||||
return channel != null;
|
||||
}
|
||||
|
||||
private void shutdown0() {
|
||||
assert eventLoop.inEventLoop();
|
||||
if (channel != null) {
|
||||
channel.close();
|
||||
channel = null;
|
||||
|
@ -118,21 +130,26 @@ class NettyRpcConnection extends RpcConnection {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void shutdown() {
|
||||
shutdown0();
|
||||
public void shutdown() {
|
||||
execute(eventLoop, this::shutdown0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void cleanupConnection() {
|
||||
if (connectionHeaderPreamble != null) {
|
||||
ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
|
||||
}
|
||||
if (connectionHeaderWithLength != null) {
|
||||
ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
|
||||
}
|
||||
public void cleanupConnection() {
|
||||
execute(eventLoop, () -> {
|
||||
if (connectionHeaderPreamble != null) {
|
||||
ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
|
||||
connectionHeaderPreamble = null;
|
||||
}
|
||||
if (connectionHeaderWithLength != null) {
|
||||
ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
|
||||
connectionHeaderWithLength = null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void established(Channel ch) throws IOException {
|
||||
assert eventLoop.inEventLoop();
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
|
||||
p.addBefore(addBeforeHandler, null,
|
||||
|
@ -146,6 +163,7 @@ class NettyRpcConnection extends RpcConnection {
|
|||
private boolean reloginInProgress;
|
||||
|
||||
private void scheduleRelogin(Throwable error) {
|
||||
assert eventLoop.inEventLoop();
|
||||
if (error instanceof FallbackDisallowedException) {
|
||||
return;
|
||||
}
|
||||
|
@ -153,38 +171,31 @@ class NettyRpcConnection extends RpcConnection {
|
|||
LOG.trace("SASL Provider does not support retries");
|
||||
return;
|
||||
}
|
||||
synchronized (this) {
|
||||
if (reloginInProgress) {
|
||||
return;
|
||||
}
|
||||
reloginInProgress = true;
|
||||
RELOGIN_EXECUTOR.schedule(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
provider.relogin();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Relogin failed", e);
|
||||
}
|
||||
synchronized (this) {
|
||||
reloginInProgress = false;
|
||||
}
|
||||
}
|
||||
}, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
|
||||
if (reloginInProgress) {
|
||||
return;
|
||||
}
|
||||
reloginInProgress = true;
|
||||
RELOGIN_EXECUTOR.schedule(() -> {
|
||||
try {
|
||||
provider.relogin();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Relogin failed", e);
|
||||
}
|
||||
eventLoop.execute(() -> {
|
||||
reloginInProgress = false;
|
||||
});
|
||||
}, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private void failInit(Channel ch, IOException e) {
|
||||
synchronized (this) {
|
||||
// fail all pending calls
|
||||
ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
|
||||
shutdown0();
|
||||
return;
|
||||
}
|
||||
assert eventLoop.inEventLoop();
|
||||
// fail all pending calls
|
||||
ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
|
||||
shutdown0();
|
||||
}
|
||||
|
||||
private void saslNegotiate(final Channel ch) {
|
||||
assert eventLoop.inEventLoop();
|
||||
UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
|
||||
if (ticket == null) {
|
||||
failInit(ch, new FatalConnectionException("ticket/user is null"));
|
||||
|
@ -194,7 +205,7 @@ class NettyRpcConnection extends RpcConnection {
|
|||
final NettyHBaseSaslRpcClientHandler saslHandler;
|
||||
try {
|
||||
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
|
||||
serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
|
||||
serverAddress, securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
|
||||
} catch (IOException e) {
|
||||
failInit(ch, e);
|
||||
return;
|
||||
|
@ -214,7 +225,7 @@ class NettyRpcConnection extends RpcConnection {
|
|||
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
|
||||
// create the handler to handle the connection header
|
||||
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
|
||||
connectionHeaderPromise, conf, connectionHeaderWithLength);
|
||||
connectionHeaderPromise, conf, connectionHeaderWithLength);
|
||||
|
||||
// add ReadTimeoutHandler to deal with server doesn't response connection header
|
||||
// because of the different configuration in client side and server side
|
||||
|
@ -253,52 +264,38 @@ class NettyRpcConnection extends RpcConnection {
|
|||
}
|
||||
|
||||
private void connect() {
|
||||
assert eventLoop.inEventLoop();
|
||||
LOG.trace("Connecting to {}", remoteId.address);
|
||||
|
||||
this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass)
|
||||
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
|
||||
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
|
||||
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
|
||||
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
|
||||
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
|
||||
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
|
||||
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
|
||||
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
|
||||
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
Channel ch = future.channel();
|
||||
if (!future.isSuccess()) {
|
||||
failInit(ch, toIOE(future.cause()));
|
||||
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
|
||||
return;
|
||||
}
|
||||
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
||||
if (useSasl) {
|
||||
saslNegotiate(ch);
|
||||
} else {
|
||||
// send the connection header to server
|
||||
ch.write(connectionHeaderWithLength.retainedDuplicate());
|
||||
established(ch);
|
||||
}
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
Channel ch = future.channel();
|
||||
if (!future.isSuccess()) {
|
||||
failInit(ch, toIOE(future.cause()));
|
||||
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
|
||||
return;
|
||||
}
|
||||
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
||||
if (useSasl) {
|
||||
saslNegotiate(ch);
|
||||
} else {
|
||||
// send the connection header to server
|
||||
ch.write(connectionHeaderWithLength.retainedDuplicate());
|
||||
established(ch);
|
||||
}
|
||||
}).channel();
|
||||
}
|
||||
|
||||
private void write(Channel ch, final Call call) {
|
||||
ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
// Fail the call if we failed to write it out. This usually because the channel is
|
||||
// closed. This is needed because we may shutdown the channel inside event loop and
|
||||
// there may still be some pending calls in the event loop queue after us.
|
||||
if (!future.isSuccess()) {
|
||||
call.setException(toIOE(future.cause()));
|
||||
}
|
||||
}
|
||||
});
|
||||
}).channel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
|
||||
private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
|
||||
assert eventLoop.inEventLoop();
|
||||
if (reloginInProgress) {
|
||||
throw new IOException("Can not send request because relogin is in progress.");
|
||||
}
|
||||
|
@ -307,10 +304,8 @@ class NettyRpcConnection extends RpcConnection {
|
|||
@Override
|
||||
public void run(Object parameter) {
|
||||
setCancelled(call);
|
||||
synchronized (this) {
|
||||
if (channel != null) {
|
||||
channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
|
||||
}
|
||||
if (channel != null) {
|
||||
channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
|
||||
}
|
||||
}
|
||||
}, new CancellationCallback() {
|
||||
|
@ -324,31 +319,31 @@ class NettyRpcConnection extends RpcConnection {
|
|||
connect();
|
||||
}
|
||||
scheduleTimeoutTask(call);
|
||||
final Channel ch = channel;
|
||||
// We must move the whole writeAndFlush call inside event loop otherwise there will be a
|
||||
// race condition.
|
||||
// In netty's DefaultChannelPipeline, it will find the first outbound handler in the
|
||||
// current thread and then schedule a task to event loop which will start the process from
|
||||
// that outbound handler. It is possible that the first handler is
|
||||
// BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
|
||||
// up at the same time so in the event loop thread we remove the
|
||||
// BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
|
||||
// write method of BufferCallBeforeInitHandler.
|
||||
// This may be considered as a bug of netty, but anyway there is a work around so let's
|
||||
// fix it by ourselves first.
|
||||
if (ch.eventLoop().inEventLoop()) {
|
||||
write(ch, call);
|
||||
} else {
|
||||
ch.eventLoop().execute(new Runnable() {
|
||||
channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
write(ch, call);
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
// Fail the call if we failed to write it out. This usually because the channel is
|
||||
// closed. This is needed because we may shutdown the channel inside event loop and
|
||||
// there may still be some pending calls in the event loop queue after us.
|
||||
if (!future.isSuccess()) {
|
||||
call.setException(toIOE(future.cause()));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(final Call call, HBaseRpcController hrc) {
|
||||
execute(eventLoop, () -> {
|
||||
try {
|
||||
sendRequest0(call, hrc);
|
||||
} catch (Exception e) {
|
||||
call.setException(toIOE(e));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,23 +18,30 @@
|
|||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.commons.lang3.mutable.MutableInt;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.DefaultEventLoop;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
|
||||
|
||||
@Category({ ClientTests.class, SmallTests.class })
|
||||
public class TestIPCUtil {
|
||||
|
||||
|
@ -43,7 +50,7 @@ public class TestIPCUtil {
|
|||
HBaseClassTestRule.forClass(TestIPCUtil.class);
|
||||
|
||||
private static Throwable create(Class<? extends Throwable> clazz) throws InstantiationException,
|
||||
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
|
||||
IllegalAccessException, InvocationTargetException, NoSuchMethodException {
|
||||
try {
|
||||
Constructor<? extends Throwable> c = clazz.getDeclaredConstructor();
|
||||
c.setAccessible(true);
|
||||
|
@ -102,4 +109,44 @@ public class TestIPCUtil {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecute() throws IOException {
|
||||
EventLoop eventLoop = new DefaultEventLoop();
|
||||
MutableInt executed = new MutableInt(0);
|
||||
MutableInt numStackTraceElements = new MutableInt(0);
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
try {
|
||||
IPCUtil.execute(eventLoop, new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
int numElements = new Exception().getStackTrace().length;
|
||||
int depth = executed.getAndIncrement();
|
||||
if (depth <= IPCUtil.MAX_DEPTH) {
|
||||
if (numElements <= numStackTraceElements.intValue()) {
|
||||
future.completeExceptionally(
|
||||
new AssertionError("should call run directly but stack trace decreased from " +
|
||||
numStackTraceElements.intValue() + " to " + numElements));
|
||||
return;
|
||||
}
|
||||
numStackTraceElements.setValue(numElements);
|
||||
IPCUtil.execute(eventLoop, this);
|
||||
} else {
|
||||
if (numElements >= numStackTraceElements.intValue()) {
|
||||
future.completeExceptionally(
|
||||
new AssertionError("should call eventLoop.execute to prevent stack overflow but" +
|
||||
" stack trace increased from " + numStackTraceElements.intValue() + " to " +
|
||||
numElements));
|
||||
} else {
|
||||
future.complete(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
FutureUtils.get(future);
|
||||
} finally {
|
||||
eventLoop.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.net.InetSocketAddress;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
public class TestNettyRpcConnection {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestNettyRpcConnection.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestNettyRpcConnection.class);
|
||||
|
||||
private static NettyRpcClient CLIENT;
|
||||
|
||||
private static NettyRpcConnection CONN;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws IOException {
|
||||
CLIENT = new NettyRpcClient(HBaseConfiguration.create());
|
||||
CONN = new NettyRpcConnection(CLIENT,
|
||||
new ConnectionId(User.getCurrent(), "test", new InetSocketAddress("localhost", 1234)));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
Closeables.close(CLIENT, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrivateMethodExecutedInEventLoop() throws IllegalAccessException {
|
||||
// make sure the test is executed with "-ea"
|
||||
assertThrows(AssertionError.class, () -> {
|
||||
assert false;
|
||||
});
|
||||
for (Method method : NettyRpcConnection.class.getDeclaredMethods()) {
|
||||
if (Modifier.isPrivate(method.getModifiers()) && !method.getName().contains("$")) {
|
||||
LOG.info("checking {}", method);
|
||||
method.setAccessible(true);
|
||||
// all private methods should be called inside the event loop thread, so calling it from
|
||||
// this thread will cause the "assert eventLoop.inEventLoop();" to fail
|
||||
try {
|
||||
// now there is no primitive parameters for the private methods so let's pass null
|
||||
method.invoke(CONN, new Object[method.getParameterCount()]);
|
||||
fail("should fail with AssertionError");
|
||||
} catch (InvocationTargetException e) {
|
||||
assertThat(e.getCause(), instanceOf(AssertionError.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
||||
|
@ -58,6 +57,8 @@ import org.junit.experimental.categories.Category;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
/**
|
||||
* Will split the table, and move region randomly when testing.
|
||||
*/
|
||||
|
@ -116,7 +117,7 @@ public class TestAsyncTableGetMultiThreaded {
|
|||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
IOUtils.closeQuietly(CONN);
|
||||
Closeables.close(CONN, true);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue