HBASE-27768 Race conditions in BlockingRpcConnection (#5154)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
a67a8f7fd3
commit
08489365a5
@ -43,6 +43,7 @@ import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.security.sasl.SaslException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
@ -96,6 +97,13 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||
justification = "We are always under lock actually")
|
||||
private Thread thread;
|
||||
|
||||
// Used for ensuring two reader threads don't run over each other. Should only be used
|
||||
// in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection
|
||||
private final Object readerThreadLock = new Object();
|
||||
|
||||
// Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps.
|
||||
private final AtomicInteger attempts = new AtomicInteger();
|
||||
|
||||
// connected socket. protected for writing UT.
|
||||
protected Socket socket = null;
|
||||
private DataInputStream in;
|
||||
@ -323,6 +331,17 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||
if (thread == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If closeConn is called while we are in the readResponse method, it's possible that a new
|
||||
// call to setupIOStreams comes in and creates a new value for "thread" before readResponse
|
||||
// finishes. Once readResponse finishes, it will come in here and thread will be non-null
|
||||
// above, but pointing at a new thread. In that case, we should end to avoid a situation
|
||||
// where two threads are forever competing for the same socket.
|
||||
if (!isCurrentThreadExpected()) {
|
||||
LOG.debug("Thread replaced by new connection thread. Ending waitForWork loop.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!calls.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
@ -336,6 +355,23 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||
} catch (InterruptedException e) {
|
||||
// Restore interrupt status
|
||||
Thread.currentThread().interrupt();
|
||||
|
||||
String msg = "Interrupted while waiting for work";
|
||||
|
||||
// If we were interrupted by closeConn, it would have set thread to null.
|
||||
// We are synchronized here and if we somehow got interrupted without setting thread to
|
||||
// null, we want to make sure the connection is closed since the read thread would be dead.
|
||||
// Rather than do a null check here, we check if the current thread is the expected thread.
|
||||
// This guards against the case where a call to setupIOStreams got the synchronized lock
|
||||
// first after closeConn, thus changing the thread to a new thread.
|
||||
if (isCurrentThreadExpected()) {
|
||||
LOG.debug(msg + ", closing connection");
|
||||
closeConn(new InterruptedIOException(msg));
|
||||
} else {
|
||||
LOG.debug(msg);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -343,13 +379,24 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(threadName + ": starting");
|
||||
LOG.trace("starting");
|
||||
}
|
||||
while (waitForWork()) {
|
||||
readResponse();
|
||||
|
||||
// We have a synchronization here because it's possible in error scenarios for a new
|
||||
// thread to be started while readResponse is still reading on the socket. We don't want
|
||||
// two threads to be reading from the same socket/inputstream.
|
||||
// The below calls can synchronize on "BlockingRpcConnection.this".
|
||||
// We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks
|
||||
synchronized (readerThreadLock) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("started");
|
||||
}
|
||||
while (waitForWork()) {
|
||||
readResponse();
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(threadName + ": stopped");
|
||||
LOG.trace("stopped");
|
||||
}
|
||||
}
|
||||
|
||||
@ -522,7 +569,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||
}
|
||||
|
||||
// start the receiver thread after the socket connection has been set up
|
||||
thread = new Thread(this, threadName);
|
||||
thread = new Thread(this, threadName + " (attempt: " + attempts.incrementAndGet() + ")");
|
||||
thread.setDaemon(true);
|
||||
thread.start();
|
||||
}
|
||||
@ -629,7 +676,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
|
||||
} catch (Throwable t) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Error while writing {}", call.toShortString());
|
||||
LOG.trace("Error while writing {}", call.toShortString(), t);
|
||||
}
|
||||
IOException e = IPCUtil.toIOE(t);
|
||||
closeConn(e);
|
||||
@ -716,16 +763,33 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||
// since we expect certain responses to not make it by the specified
|
||||
// {@link ConnectionId#rpcTimeout}.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("ignored", e);
|
||||
LOG.trace("ignored ex for call {}", call, e);
|
||||
}
|
||||
} else {
|
||||
synchronized (this) {
|
||||
closeConn(e);
|
||||
// The exception we received may have been caused by another thread closing
|
||||
// this connection. It's possible that before getting to this point, a new connection was
|
||||
// created. In that case, it doesn't help and can actually hurt to close again here.
|
||||
if (isCurrentThreadExpected()) {
|
||||
LOG.debug("Closing connection after error in call {}", call, e);
|
||||
closeConn(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For use in the reader thread, tests if the current reader thread is the one expected to be
|
||||
* running. When closeConn is called, the reader thread is expected to end. setupIOStreams then
|
||||
* creates a new thread and updates the thread pointer. At that point, the new thread should be
|
||||
* the only one running. We use this method to guard against cases where the old thread may be
|
||||
* erroneously running or closing the connection in error states.
|
||||
*/
|
||||
private boolean isCurrentThreadExpected() {
|
||||
return thread == Thread.currentThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void callTimeout(Call call) {
|
||||
// call sender
|
||||
|
Loading…
x
Reference in New Issue
Block a user