diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index fd3bab0edce..5ece8ae5b08 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -152,6 +152,18 @@ public class RpcClientImpl extends AbstractRpcClient { } } + /* + * This is the return value from {@link #waitForWork()} indicating whether run() method should: + * read response + * close the connection + * take no action - connection would be closed by others + */ + private enum WaitForWorkResult { + READ_RESPONSE, + CALLER_SHOULD_CLOSE, + CLOSED + } + /** Thread that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ @@ -243,12 +255,13 @@ public class RpcClientImpl extends AbstractRpcClient { */ @Override public void run() { + boolean closeBySelf = false; while (!shouldCloseConnection.get()) { CallFuture cts = null; try { cts = callsToWrite.take(); } catch (InterruptedException e) { - markClosed(new InterruptedIOException()); + closeBySelf = markClosed(new InterruptedIOException()); } if (cts == null || cts == CallFuture.DEATH_PILL) { @@ -272,11 +285,14 @@ public class RpcClientImpl extends AbstractRpcClient { + ", message =" + e.getMessage()); } cts.call.setException(e); - markClosed(e); + closeBySelf = markClosed(e); } } cleanup(); + if (closeBySelf) { + close(); + } } /** @@ -510,27 +526,28 @@ public class RpcClientImpl extends AbstractRpcClient { * it is idle too long, it is marked as to be closed, * or the client is marked as not running. * - * @return true if it is time to read a response; false otherwise. + * @return WaitForWorkResult indicating whether it is time to read response; + * if the caller should close; or otherwise */ - protected synchronized boolean waitForWork() throws InterruptedException { + protected synchronized WaitForWorkResult waitForWork() throws InterruptedException { // beware of the concurrent access to the calls list: we can add calls, but as well // remove them. long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose; while (true) { if (shouldCloseConnection.get()) { - return false; + return WaitForWorkResult.CLOSED; } if (!running.get()) { - markClosed(new IOException("stopped with " + calls.size() + " pending request(s)")); - return false; + if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) { + return WaitForWorkResult.CALLER_SHOULD_CLOSE; + } + return WaitForWorkResult.CLOSED; } if (!calls.isEmpty()) { - // shouldCloseConnection can be set to true by a parallel thread here. The caller - // will need to check anyway. - return true; + return WaitForWorkResult.READ_RESPONSE; } if (EnvironmentEdgeManager.currentTime() >= waitUntil) { @@ -538,9 +555,11 @@ public class RpcClientImpl extends AbstractRpcClient { // We expect the number of calls to be zero here, but actually someone can // adds a call at the any moment, as there is no synchronization between this task // and adding new calls. It's not a big issue, but it will get an exception. - markClosed(new IOException( - "idle connection closed with " + calls.size() + " pending request(s)")); - return false; + if (markClosed(new IOException( + "idle connection closed with " + calls.size() + " pending request(s)"))) { + return WaitForWorkResult.CALLER_SHOULD_CLOSE; + } + return WaitForWorkResult.CLOSED; } wait(Math.min(minIdleTimeBeforeClose, 1000)); @@ -557,23 +576,37 @@ public class RpcClientImpl extends AbstractRpcClient { LOG.trace(getName() + ": starting, connections " + connections.size()); } + WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE; try { - while (waitForWork()) { // Wait here for work - read or close connection - readResponse(); + result = waitForWork(); // Wait here for work - read or close connection + while (result == WaitForWorkResult.READ_RESPONSE) { + if (readResponse()) { + // shouldCloseConnection is set to true by readResponse(). Close the connection + result = WaitForWorkResult.CALLER_SHOULD_CLOSE; + } else { + result = waitForWork(); + } } } catch (InterruptedException t) { if (LOG.isTraceEnabled()) { LOG.trace(getName() + ": interrupted while waiting for call responses"); } - markClosed(ExceptionUtil.asInterrupt(t)); + if (markClosed(ExceptionUtil.asInterrupt(t))) { + // shouldCloseConnection is set to true. Close connection + result = WaitForWorkResult.CALLER_SHOULD_CLOSE; + } } catch (Throwable t) { if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t); } - markClosed(new IOException("Unexpected throwable while waiting call responses", t)); + if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) { + // shouldCloseConnection is set to true. Close connection + result = WaitForWorkResult.CALLER_SHOULD_CLOSE; + } + } + if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) { + close(); } - - close(); if (LOG.isTraceEnabled()) { LOG.trace(getName() + ": stopped, connections " + connections.size()); @@ -702,8 +735,9 @@ public class RpcClientImpl extends AbstractRpcClient { } IOException e = new FailedServerException( "This server is in the failed servers list: " + server); - markClosed(e); - close(); + if (markClosed(e)) { + close(); + } throw e; } @@ -781,8 +815,9 @@ public class RpcClientImpl extends AbstractRpcClient { e = new IOException("Could not set up IO Streams to " + server, t); } } - markClosed(e); - close(); + if (markClosed(e)) { + close(); + } throw e; } } @@ -922,9 +957,10 @@ public class RpcClientImpl extends AbstractRpcClient { /* Receive a response. * Because only one receiver, so no synchronization on in. + * @return true if connection should be closed by caller */ - protected void readResponse() { - if (shouldCloseConnection.get()) return; + protected boolean readResponse() { + if (shouldCloseConnection.get()) return false; Call call = null; boolean expectedCall = false; try { @@ -946,14 +982,14 @@ public class RpcClientImpl extends AbstractRpcClient { int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; IOUtils.skipFully(in, whatIsLeftToRead); - return; + return false; } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); call.setException(re); if (isFatalConnectionException(exceptionResponse)) { - markClosed(re); + return markClosed(re); } } else { Message value = null; @@ -980,11 +1016,12 @@ public class RpcClientImpl extends AbstractRpcClient { if (LOG.isTraceEnabled()) LOG.trace("ignored", e); } else { // Treat this as a fatal condition and close this connection - markClosed(e); + return markClosed(e); } } finally { cleanupCalls(false); } + return false; } /** @@ -1010,18 +1047,22 @@ public class RpcClientImpl extends AbstractRpcClient { e.getStackTrace(), doNotRetry); } - protected synchronized void markClosed(IOException e) { + /* + * @return true if shouldCloseConnection is set true by this thread; false otherwise + */ + protected boolean markClosed(IOException e) { if (e == null) throw new NullPointerException(); - if (shouldCloseConnection.compareAndSet(false, true)) { + boolean ret = shouldCloseConnection.compareAndSet(false, true); + if (ret) { if (LOG.isTraceEnabled()) { LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage()); } if (callSender != null) { callSender.close(); } - notifyAll(); } + return ret; } @@ -1120,8 +1161,9 @@ public class RpcClientImpl extends AbstractRpcClient { // In case the CallSender did not setupIOStreams() yet, the Connection may not be started // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851 if (!conn.isAlive()) { - conn.markClosed(new InterruptedIOException("RpcClient is closing")); - conn.close(); + if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) { + conn.close(); + } } } }