HBASE-14241 Fix deadlock during cluster shutdown due to concurrent connection close

This commit is contained in:
tedyu 2015-08-19 08:49:38 -07:00
parent 1bb9e3ae96
commit 16f8d27708
1 changed files with 75 additions and 33 deletions

View File

@ -152,6 +152,18 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
} }
/*
* This is the return value from {@link #waitForWork()} indicating whether run() method should:
* read response
* close the connection
* take no action - connection would be closed by others
*/
private enum WaitForWorkResult {
READ_RESPONSE,
CALLER_SHOULD_CLOSE,
CLOSED
}
/** Thread that reads responses and notifies callers. Each connection owns a /** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this * socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */ * socket: responses may be delivered out of order. */
@ -243,12 +255,13 @@ public class RpcClientImpl extends AbstractRpcClient {
*/ */
@Override @Override
public void run() { public void run() {
boolean closeBySelf = false;
while (!shouldCloseConnection.get()) { while (!shouldCloseConnection.get()) {
CallFuture cts = null; CallFuture cts = null;
try { try {
cts = callsToWrite.take(); cts = callsToWrite.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
markClosed(new InterruptedIOException()); closeBySelf = markClosed(new InterruptedIOException());
} }
if (cts == null || cts == CallFuture.DEATH_PILL) { if (cts == null || cts == CallFuture.DEATH_PILL) {
@ -272,11 +285,14 @@ public class RpcClientImpl extends AbstractRpcClient {
+ ", message =" + e.getMessage()); + ", message =" + e.getMessage());
} }
cts.call.setException(e); cts.call.setException(e);
markClosed(e); closeBySelf = markClosed(e);
} }
} }
cleanup(); cleanup();
if (closeBySelf) {
close();
}
} }
/** /**
@ -510,27 +526,28 @@ public class RpcClientImpl extends AbstractRpcClient {
* it is idle too long, it is marked as to be closed, * it is idle too long, it is marked as to be closed,
* or the client is marked as not running. * or the client is marked as not running.
* *
* @return true if it is time to read a response; false otherwise. * @return WaitForWorkResult indicating whether it is time to read response;
* if the caller should close; or otherwise
*/ */
protected synchronized boolean waitForWork() throws InterruptedException { protected synchronized WaitForWorkResult waitForWork() throws InterruptedException {
// beware of the concurrent access to the calls list: we can add calls, but as well // beware of the concurrent access to the calls list: we can add calls, but as well
// remove them. // remove them.
long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose; long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
while (true) { while (true) {
if (shouldCloseConnection.get()) { if (shouldCloseConnection.get()) {
return false; return WaitForWorkResult.CLOSED;
} }
if (!running.get()) { if (!running.get()) {
markClosed(new IOException("stopped with " + calls.size() + " pending request(s)")); if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) {
return false; return WaitForWorkResult.CALLER_SHOULD_CLOSE;
}
return WaitForWorkResult.CLOSED;
} }
if (!calls.isEmpty()) { if (!calls.isEmpty()) {
// shouldCloseConnection can be set to true by a parallel thread here. The caller return WaitForWorkResult.READ_RESPONSE;
// will need to check anyway.
return true;
} }
if (EnvironmentEdgeManager.currentTime() >= waitUntil) { if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
@ -538,9 +555,11 @@ public class RpcClientImpl extends AbstractRpcClient {
// We expect the number of calls to be zero here, but actually someone can // We expect the number of calls to be zero here, but actually someone can
// adds a call at the any moment, as there is no synchronization between this task // adds a call at the any moment, as there is no synchronization between this task
// and adding new calls. It's not a big issue, but it will get an exception. // and adding new calls. It's not a big issue, but it will get an exception.
markClosed(new IOException( if (markClosed(new IOException(
"idle connection closed with " + calls.size() + " pending request(s)")); "idle connection closed with " + calls.size() + " pending request(s)"))) {
return false; return WaitForWorkResult.CALLER_SHOULD_CLOSE;
}
return WaitForWorkResult.CLOSED;
} }
wait(Math.min(minIdleTimeBeforeClose, 1000)); wait(Math.min(minIdleTimeBeforeClose, 1000));
@ -557,23 +576,37 @@ public class RpcClientImpl extends AbstractRpcClient {
LOG.trace(getName() + ": starting, connections " + connections.size()); LOG.trace(getName() + ": starting, connections " + connections.size());
} }
WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
try { try {
while (waitForWork()) { // Wait here for work - read or close connection result = waitForWork(); // Wait here for work - read or close connection
readResponse(); while (result == WaitForWorkResult.READ_RESPONSE) {
if (readResponse()) {
// shouldCloseConnection is set to true by readResponse(). Close the connection
result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
} else {
result = waitForWork();
}
} }
} catch (InterruptedException t) { } catch (InterruptedException t) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": interrupted while waiting for call responses"); LOG.trace(getName() + ": interrupted while waiting for call responses");
} }
markClosed(ExceptionUtil.asInterrupt(t)); if (markClosed(ExceptionUtil.asInterrupt(t))) {
// shouldCloseConnection is set to true. Close connection
result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
}
} catch (Throwable t) { } catch (Throwable t) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t); LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
} }
markClosed(new IOException("Unexpected throwable while waiting call responses", t)); if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) {
// shouldCloseConnection is set to true. Close connection
result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
}
}
if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) {
close();
} }
close();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": stopped, connections " + connections.size()); LOG.trace(getName() + ": stopped, connections " + connections.size());
@ -702,8 +735,9 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
IOException e = new FailedServerException( IOException e = new FailedServerException(
"This server is in the failed servers list: " + server); "This server is in the failed servers list: " + server);
markClosed(e); if (markClosed(e)) {
close(); close();
}
throw e; throw e;
} }
@ -781,8 +815,9 @@ public class RpcClientImpl extends AbstractRpcClient {
e = new IOException("Could not set up IO Streams to " + server, t); e = new IOException("Could not set up IO Streams to " + server, t);
} }
} }
markClosed(e); if (markClosed(e)) {
close(); close();
}
throw e; throw e;
} }
} }
@ -922,9 +957,10 @@ public class RpcClientImpl extends AbstractRpcClient {
/* Receive a response. /* Receive a response.
* Because only one receiver, so no synchronization on in. * Because only one receiver, so no synchronization on in.
* @return true if connection should be closed by caller
*/ */
protected void readResponse() { protected boolean readResponse() {
if (shouldCloseConnection.get()) return; if (shouldCloseConnection.get()) return false;
Call call = null; Call call = null;
boolean expectedCall = false; boolean expectedCall = false;
try { try {
@ -946,14 +982,14 @@ public class RpcClientImpl extends AbstractRpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar; int whatIsLeftToRead = totalSize - readSoFar;
IOUtils.skipFully(in, whatIsLeftToRead); IOUtils.skipFully(in, whatIsLeftToRead);
return; return false;
} }
if (responseHeader.hasException()) { if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException(); ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse); RemoteException re = createRemoteException(exceptionResponse);
call.setException(re); call.setException(re);
if (isFatalConnectionException(exceptionResponse)) { if (isFatalConnectionException(exceptionResponse)) {
markClosed(re); return markClosed(re);
} }
} else { } else {
Message value = null; Message value = null;
@ -980,11 +1016,12 @@ public class RpcClientImpl extends AbstractRpcClient {
if (LOG.isTraceEnabled()) LOG.trace("ignored", e); if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
} else { } else {
// Treat this as a fatal condition and close this connection // Treat this as a fatal condition and close this connection
markClosed(e); return markClosed(e);
} }
} finally { } finally {
cleanupCalls(false); cleanupCalls(false);
} }
return false;
} }
/** /**
@ -1010,18 +1047,22 @@ public class RpcClientImpl extends AbstractRpcClient {
e.getStackTrace(), doNotRetry); e.getStackTrace(), doNotRetry);
} }
protected synchronized void markClosed(IOException e) { /*
* @return true if shouldCloseConnection is set true by this thread; false otherwise
*/
protected boolean markClosed(IOException e) {
if (e == null) throw new NullPointerException(); if (e == null) throw new NullPointerException();
if (shouldCloseConnection.compareAndSet(false, true)) { boolean ret = shouldCloseConnection.compareAndSet(false, true);
if (ret) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage()); LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
} }
if (callSender != null) { if (callSender != null) {
callSender.close(); callSender.close();
} }
notifyAll();
} }
return ret;
} }
@ -1120,8 +1161,9 @@ public class RpcClientImpl extends AbstractRpcClient {
// In case the CallSender did not setupIOStreams() yet, the Connection may not be started // In case the CallSender did not setupIOStreams() yet, the Connection may not be started
// at all (if CallSender has a cancelled Call it can happen). See HBASE-13851 // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
if (!conn.isAlive()) { if (!conn.isAlive()) {
conn.markClosed(new InterruptedIOException("RpcClient is closing")); if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
conn.close(); conn.close();
}
} }
} }
} }