HBASE-14449 Rewrite deadlock prevention for concurrent connection close
This commit is contained in:
parent
7fb12e3331
commit
00f467b225
|
@ -88,10 +88,12 @@ import java.net.UnknownHostException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
@ -152,18 +154,6 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* This is the return value from {@link #waitForWork()} indicating whether run() method should:
|
|
||||||
* read response
|
|
||||||
* close the connection
|
|
||||||
* take no action - connection would be closed by others
|
|
||||||
*/
|
|
||||||
private enum WaitForWorkResult {
|
|
||||||
READ_RESPONSE,
|
|
||||||
CALLER_SHOULD_CLOSE,
|
|
||||||
CLOSED
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Thread that reads responses and notifies callers. Each connection owns a
|
/** Thread that reads responses and notifies callers. Each connection owns a
|
||||||
* socket connected to a remote address. Calls are multiplexed through this
|
* socket connected to a remote address. Calls are multiplexed through this
|
||||||
* socket: responses may be delivered out of order. */
|
* socket: responses may be delivered out of order. */
|
||||||
|
@ -255,13 +245,12 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
boolean closeBySelf = false;
|
|
||||||
while (!shouldCloseConnection.get()) {
|
while (!shouldCloseConnection.get()) {
|
||||||
CallFuture cts = null;
|
CallFuture cts = null;
|
||||||
try {
|
try {
|
||||||
cts = callsToWrite.take();
|
cts = callsToWrite.take();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
closeBySelf = markClosed(new InterruptedIOException());
|
markClosed(new InterruptedIOException());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cts == null || cts == CallFuture.DEATH_PILL) {
|
if (cts == null || cts == CallFuture.DEATH_PILL) {
|
||||||
|
@ -285,14 +274,11 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
+ ", message =" + e.getMessage());
|
+ ", message =" + e.getMessage());
|
||||||
}
|
}
|
||||||
cts.call.setException(e);
|
cts.call.setException(e);
|
||||||
closeBySelf = markClosed(e);
|
markClosed(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup();
|
cleanup();
|
||||||
if (closeBySelf) {
|
|
||||||
close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -526,28 +512,27 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
* it is idle too long, it is marked as to be closed,
|
* it is idle too long, it is marked as to be closed,
|
||||||
* or the client is marked as not running.
|
* or the client is marked as not running.
|
||||||
*
|
*
|
||||||
* @return WaitForWorkResult indicating whether it is time to read response;
|
* @return true if it is time to read a response; false otherwise.
|
||||||
* if the caller should close; or otherwise
|
|
||||||
*/
|
*/
|
||||||
protected synchronized WaitForWorkResult waitForWork() throws InterruptedException {
|
protected synchronized boolean waitForWork() throws InterruptedException {
|
||||||
// beware of the concurrent access to the calls list: we can add calls, but as well
|
// beware of the concurrent access to the calls list: we can add calls, but as well
|
||||||
// remove them.
|
// remove them.
|
||||||
long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
|
long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (shouldCloseConnection.get()) {
|
if (shouldCloseConnection.get()) {
|
||||||
return WaitForWorkResult.CLOSED;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!running.get()) {
|
if (!running.get()) {
|
||||||
if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) {
|
markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
|
||||||
return WaitForWorkResult.CALLER_SHOULD_CLOSE;
|
return false;
|
||||||
}
|
|
||||||
return WaitForWorkResult.CLOSED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!calls.isEmpty()) {
|
if (!calls.isEmpty()) {
|
||||||
return WaitForWorkResult.READ_RESPONSE;
|
// shouldCloseConnection can be set to true by a parallel thread here. The caller
|
||||||
|
// will need to check anyway.
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
|
if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
|
||||||
|
@ -555,11 +540,9 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
// We expect the number of calls to be zero here, but actually someone can
|
// We expect the number of calls to be zero here, but actually someone can
|
||||||
// adds a call at the any moment, as there is no synchronization between this task
|
// adds a call at the any moment, as there is no synchronization between this task
|
||||||
// and adding new calls. It's not a big issue, but it will get an exception.
|
// and adding new calls. It's not a big issue, but it will get an exception.
|
||||||
if (markClosed(new IOException(
|
markClosed(new IOException(
|
||||||
"idle connection closed with " + calls.size() + " pending request(s)"))) {
|
"idle connection closed with " + calls.size() + " pending request(s)"));
|
||||||
return WaitForWorkResult.CALLER_SHOULD_CLOSE;
|
return false;
|
||||||
}
|
|
||||||
return WaitForWorkResult.CLOSED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wait(Math.min(minIdleTimeBeforeClose, 1000));
|
wait(Math.min(minIdleTimeBeforeClose, 1000));
|
||||||
|
@ -576,38 +559,24 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
LOG.trace(getName() + ": starting, connections " + connections.size());
|
LOG.trace(getName() + ": starting, connections " + connections.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
|
|
||||||
try {
|
try {
|
||||||
result = waitForWork(); // Wait here for work - read or close connection
|
while (waitForWork()) { // Wait here for work - read or close connection
|
||||||
while (result == WaitForWorkResult.READ_RESPONSE) {
|
readResponse();
|
||||||
if (readResponse()) {
|
|
||||||
// shouldCloseConnection is set to true by readResponse(). Close the connection
|
|
||||||
result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
|
|
||||||
} else {
|
|
||||||
result = waitForWork();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (InterruptedException t) {
|
} catch (InterruptedException t) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(getName() + ": interrupted while waiting for call responses");
|
LOG.trace(getName() + ": interrupted while waiting for call responses");
|
||||||
}
|
}
|
||||||
if (markClosed(ExceptionUtil.asInterrupt(t))) {
|
markClosed(ExceptionUtil.asInterrupt(t));
|
||||||
// shouldCloseConnection is set to true. Close connection
|
|
||||||
result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
|
LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
|
||||||
}
|
}
|
||||||
if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) {
|
markClosed(new IOException("Unexpected throwable while waiting call responses", t));
|
||||||
// shouldCloseConnection is set to true. Close connection
|
|
||||||
result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) {
|
|
||||||
close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close();
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(getName() + ": stopped, connections " + connections.size());
|
LOG.trace(getName() + ": stopped, connections " + connections.size());
|
||||||
}
|
}
|
||||||
|
@ -735,9 +704,8 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
IOException e = new FailedServerException(
|
IOException e = new FailedServerException(
|
||||||
"This server is in the failed servers list: " + server);
|
"This server is in the failed servers list: " + server);
|
||||||
if (markClosed(e)) {
|
markClosed(e);
|
||||||
close();
|
close();
|
||||||
}
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -815,9 +783,8 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
e = new IOException("Could not set up IO Streams to " + server, t);
|
e = new IOException("Could not set up IO Streams to " + server, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (markClosed(e)) {
|
markClosed(e);
|
||||||
close();
|
close();
|
||||||
}
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -948,9 +915,8 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// We set the value inside the synchronized block, this way the next in line
|
// We set the value inside the synchronized block, this way the next in line
|
||||||
// won't even try to write
|
// won't even try to write
|
||||||
if (markClosed(e)) {
|
markClosed(e);
|
||||||
close();
|
close();
|
||||||
}
|
|
||||||
writeException = e;
|
writeException = e;
|
||||||
interrupt();
|
interrupt();
|
||||||
}
|
}
|
||||||
|
@ -968,10 +934,9 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
|
|
||||||
/* Receive a response.
|
/* Receive a response.
|
||||||
* Because only one receiver, so no synchronization on in.
|
* Because only one receiver, so no synchronization on in.
|
||||||
* @return true if connection should be closed by caller
|
|
||||||
*/
|
*/
|
||||||
protected boolean readResponse() {
|
protected void readResponse() {
|
||||||
if (shouldCloseConnection.get()) return false;
|
if (shouldCloseConnection.get()) return;
|
||||||
Call call = null;
|
Call call = null;
|
||||||
boolean expectedCall = false;
|
boolean expectedCall = false;
|
||||||
try {
|
try {
|
||||||
|
@ -993,14 +958,14 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
|
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
|
||||||
int whatIsLeftToRead = totalSize - readSoFar;
|
int whatIsLeftToRead = totalSize - readSoFar;
|
||||||
IOUtils.skipFully(in, whatIsLeftToRead);
|
IOUtils.skipFully(in, whatIsLeftToRead);
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
if (responseHeader.hasException()) {
|
if (responseHeader.hasException()) {
|
||||||
ExceptionResponse exceptionResponse = responseHeader.getException();
|
ExceptionResponse exceptionResponse = responseHeader.getException();
|
||||||
RemoteException re = createRemoteException(exceptionResponse);
|
RemoteException re = createRemoteException(exceptionResponse);
|
||||||
call.setException(re);
|
call.setException(re);
|
||||||
if (isFatalConnectionException(exceptionResponse)) {
|
if (isFatalConnectionException(exceptionResponse)) {
|
||||||
return markClosed(re);
|
markClosed(re);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Message value = null;
|
Message value = null;
|
||||||
|
@ -1027,12 +992,11 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
|
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
|
||||||
} else {
|
} else {
|
||||||
// Treat this as a fatal condition and close this connection
|
// Treat this as a fatal condition and close this connection
|
||||||
return markClosed(e);
|
markClosed(e);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
cleanupCalls(false);
|
cleanupCalls(false);
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1058,22 +1022,18 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
e.getStackTrace(), doNotRetry);
|
e.getStackTrace(), doNotRetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
protected synchronized void markClosed(IOException e) {
|
||||||
* @return true if shouldCloseConnection is set true by this thread; false otherwise
|
|
||||||
*/
|
|
||||||
protected boolean markClosed(IOException e) {
|
|
||||||
if (e == null) throw new NullPointerException();
|
if (e == null) throw new NullPointerException();
|
||||||
|
|
||||||
boolean ret = shouldCloseConnection.compareAndSet(false, true);
|
if (shouldCloseConnection.compareAndSet(false, true)) {
|
||||||
if (ret) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
|
LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
|
||||||
}
|
}
|
||||||
if (callSender != null) {
|
if (callSender != null) {
|
||||||
callSender.close();
|
callSender.close();
|
||||||
}
|
}
|
||||||
|
notifyAll();
|
||||||
}
|
}
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1161,6 +1121,7 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
|
if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
|
||||||
if (!running.compareAndSet(true, false)) return;
|
if (!running.compareAndSet(true, false)) return;
|
||||||
|
|
||||||
|
Set<Connection> connsToClose = null;
|
||||||
// wake up all connections
|
// wake up all connections
|
||||||
synchronized (connections) {
|
synchronized (connections) {
|
||||||
for (Connection conn : connections.values()) {
|
for (Connection conn : connections.values()) {
|
||||||
|
@ -1172,13 +1133,19 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
// In case the CallSender did not setupIOStreams() yet, the Connection may not be started
|
// In case the CallSender did not setupIOStreams() yet, the Connection may not be started
|
||||||
// at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
|
// at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
|
||||||
if (!conn.isAlive()) {
|
if (!conn.isAlive()) {
|
||||||
if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
|
if (connsToClose == null) {
|
||||||
conn.close();
|
connsToClose = new HashSet<Connection>();
|
||||||
}
|
}
|
||||||
|
connsToClose.add(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (connsToClose != null) {
|
||||||
|
for (Connection conn : connsToClose) {
|
||||||
|
conn.markClosed(new InterruptedIOException("RpcClient is closing"));
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
// wait until all connections are closed
|
// wait until all connections are closed
|
||||||
while (!connections.isEmpty()) {
|
while (!connections.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue