diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index a0b97af25c6..33e050dcf7c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -270,7 +270,7 @@ public class RpcClient { * Check if the call did timeout. Set an exception (includes a notify) if it's the case. * @return true if the call is on timeout, false otherwise. */ - public boolean checkTimeout() { + public boolean checkAndSetTimeout() { if (timeout == 0){ return false; } @@ -358,9 +358,9 @@ public class RpcClient { * see {@link org.apache.hadoop.hbase.ipc.RpcClient.Connection.CallSender} */ private static class CallFuture { - Call call; - int priority; - Span span; + final Call call; + final int priority; + final Span span; // We will use this to stop the writer final static CallFuture DEATH_PILL = new CallFuture(null, -1, null); @@ -472,7 +472,7 @@ public class RpcClient { continue; } - if (cts.call.checkTimeout()) { + if (cts.call.checkAndSetTimeout()) { continue; } @@ -1118,11 +1118,12 @@ public class RpcClient { */ protected void readResponse() { if (shouldCloseConnection.get()) return; - int totalSize; + Call call = null; + boolean expectedCall = false; try { // See HBaseServer.Call.setResponse for where we write out the response. // Total size of the response. Unused. But have to read it in anyways. - totalSize = in.readInt(); + int totalSize = in.readInt(); // Read the header ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); @@ -1131,8 +1132,8 @@ public class RpcClient { LOG.debug(getName() + ": got response header " + TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes"); } - Call call = calls.remove(id); - boolean expectedCall = (call != null && !call.done); + call = calls.remove(id); // call.done have to be set before leaving this method + expectedCall = (call != null && !call.done); if (!expectedCall) { // So we got a response for which we have no corresponding 'call' here on the client-side. // We probably timed out waiting, cleaned up all references, and now the server decides @@ -1148,14 +1149,13 @@ public class RpcClient { if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); + if (expectedCall) call.setException(re); if (isFatalConnectionException(exceptionResponse)) { markClosed(re); - } else { - if (expectedCall) call.setException(re); } } else { Message value = null; - // Call may be null because it may have timedout and been cleaned up on this side already + // Call may be null because it may have timeout and been cleaned up on this side already if (expectedCall && call.responseDefaultType != null) { Builder builder = call.responseDefaultType.newBuilderForType(); builder.mergeDelimitedFrom(in); @@ -1173,6 +1173,7 @@ public class RpcClient { if (expectedCall) call.setResponse(value, cellBlockScanner); } } catch (IOException e) { + if (expectedCall) call.setException(e); if (e instanceof SocketTimeoutException) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified @@ -1183,6 +1184,11 @@ public class RpcClient { } } finally { cleanupCalls(false); + if (expectedCall && !call.done) { + LOG.warn("Coding error: code should be true for callId=" + call.id + + ", server=" + getRemoteAddress() + + ", shouldCloseConnection=" + shouldCloseConnection.get()); + } } } @@ -1227,7 +1233,7 @@ public class RpcClient { /** * Cleanup the calls older than a given timeout, in milli seconds. - * @param allCalls for all calls, + * @param allCalls true for all calls, false for only the calls in timeout */ protected synchronized void cleanupCalls(boolean allCalls) { Iterator> itor = calls.entrySet().iterator(); @@ -1238,10 +1244,11 @@ public class RpcClient { itor.remove(); } else if (allCalls) { long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime(); - IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime); + IOException ie = new IOException("Connection to " + getRemoteAddress() + + " is closing. Call id=" + c.id + ", waitTime=" + waitTime); c.setException(ie); itor.remove(); - } else if (c.checkTimeout()) { + } else if (c.checkAndSetTimeout()) { itor.remove(); } else { // We expect the call to be ordered by timeout. It may not be the case, but stopping @@ -1468,10 +1475,14 @@ public class RpcClient { } while (!call.done) { - if (call.checkTimeout()) { + if (call.checkAndSetTimeout()) { if (cts != null) connection.callSender.remove(cts); break; } + if (connection.shouldCloseConnection.get()) { + throw new IOException("Call id=" + call.id + " on server " + + addr + " aborted: connection is closing"); + } try { synchronized (call) { call.wait(Math.min(call.remainingTime(), 1000) + 1);