HBASE-10814 RpcClient: some calls can get stuck when connection is closing

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1581414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-03-25 16:48:29 +00:00
parent 4a0904502d
commit 0e5d21e96a
1 changed files with 27 additions and 16 deletions

View File

@ -270,7 +270,7 @@ public class RpcClient {
* Check if the call did timeout. Set an exception (includes a notify) if it's the case. * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
* @return true if the call is on timeout, false otherwise. * @return true if the call is on timeout, false otherwise.
*/ */
public boolean checkTimeout() { public boolean checkAndSetTimeout() {
if (timeout == 0){ if (timeout == 0){
return false; return false;
} }
@ -358,9 +358,9 @@ public class RpcClient {
* see {@link org.apache.hadoop.hbase.ipc.RpcClient.Connection.CallSender} * see {@link org.apache.hadoop.hbase.ipc.RpcClient.Connection.CallSender}
*/ */
private static class CallFuture { private static class CallFuture {
Call call; final Call call;
int priority; final int priority;
Span span; final Span span;
// We will use this to stop the writer // We will use this to stop the writer
final static CallFuture DEATH_PILL = new CallFuture(null, -1, null); final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
@ -472,7 +472,7 @@ public class RpcClient {
continue; continue;
} }
if (cts.call.checkTimeout()) { if (cts.call.checkAndSetTimeout()) {
continue; continue;
} }
@ -1118,11 +1118,12 @@ public class RpcClient {
*/ */
protected void readResponse() { protected void readResponse() {
if (shouldCloseConnection.get()) return; if (shouldCloseConnection.get()) return;
int totalSize; Call call = null;
boolean expectedCall = false;
try { try {
// See HBaseServer.Call.setResponse for where we write out the response. // See HBaseServer.Call.setResponse for where we write out the response.
// Total size of the response. Unused. But have to read it in anyways. // Total size of the response. Unused. But have to read it in anyways.
totalSize = in.readInt(); int totalSize = in.readInt();
// Read the header // Read the header
ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
@ -1131,8 +1132,8 @@ public class RpcClient {
LOG.debug(getName() + ": got response header " + LOG.debug(getName() + ": got response header " +
TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes"); TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
} }
Call call = calls.remove(id); call = calls.remove(id); // call.done have to be set before leaving this method
boolean expectedCall = (call != null && !call.done); expectedCall = (call != null && !call.done);
if (!expectedCall) { if (!expectedCall) {
// So we got a response for which we have no corresponding 'call' here on the client-side. // So we got a response for which we have no corresponding 'call' here on the client-side.
// We probably timed out waiting, cleaned up all references, and now the server decides // We probably timed out waiting, cleaned up all references, and now the server decides
@ -1148,14 +1149,13 @@ public class RpcClient {
if (responseHeader.hasException()) { if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException(); ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse); RemoteException re = createRemoteException(exceptionResponse);
if (expectedCall) call.setException(re);
if (isFatalConnectionException(exceptionResponse)) { if (isFatalConnectionException(exceptionResponse)) {
markClosed(re); markClosed(re);
} else {
if (expectedCall) call.setException(re);
} }
} else { } else {
Message value = null; Message value = null;
// Call may be null because it may have timedout and been cleaned up on this side already // Call may be null because it may have timeout and been cleaned up on this side already
if (expectedCall && call.responseDefaultType != null) { if (expectedCall && call.responseDefaultType != null) {
Builder builder = call.responseDefaultType.newBuilderForType(); Builder builder = call.responseDefaultType.newBuilderForType();
builder.mergeDelimitedFrom(in); builder.mergeDelimitedFrom(in);
@ -1173,6 +1173,7 @@ public class RpcClient {
if (expectedCall) call.setResponse(value, cellBlockScanner); if (expectedCall) call.setResponse(value, cellBlockScanner);
} }
} catch (IOException e) { } catch (IOException e) {
if (expectedCall) call.setException(e);
if (e instanceof SocketTimeoutException) { if (e instanceof SocketTimeoutException) {
// Clean up open calls but don't treat this as a fatal condition, // Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified // since we expect certain responses to not make it by the specified
@ -1183,6 +1184,11 @@ public class RpcClient {
} }
} finally { } finally {
cleanupCalls(false); cleanupCalls(false);
if (expectedCall && !call.done) {
LOG.warn("Coding error: code should be true for callId=" + call.id +
", server=" + getRemoteAddress() +
", shouldCloseConnection=" + shouldCloseConnection.get());
}
} }
} }
@ -1227,7 +1233,7 @@ public class RpcClient {
/** /**
* Cleanup the calls older than a given timeout, in milli seconds. * Cleanup the calls older than a given timeout, in milli seconds.
* @param allCalls for all calls, * @param allCalls true for all calls, false for only the calls in timeout
*/ */
protected synchronized void cleanupCalls(boolean allCalls) { protected synchronized void cleanupCalls(boolean allCalls) {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator(); Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
@ -1238,10 +1244,11 @@ public class RpcClient {
itor.remove(); itor.remove();
} else if (allCalls) { } else if (allCalls) {
long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime(); long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime); IOException ie = new IOException("Connection to " + getRemoteAddress()
+ " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
c.setException(ie); c.setException(ie);
itor.remove(); itor.remove();
} else if (c.checkTimeout()) { } else if (c.checkAndSetTimeout()) {
itor.remove(); itor.remove();
} else { } else {
// We expect the call to be ordered by timeout. It may not be the case, but stopping // We expect the call to be ordered by timeout. It may not be the case, but stopping
@ -1468,10 +1475,14 @@ public class RpcClient {
} }
while (!call.done) { while (!call.done) {
if (call.checkTimeout()) { if (call.checkAndSetTimeout()) {
if (cts != null) connection.callSender.remove(cts); if (cts != null) connection.callSender.remove(cts);
break; break;
} }
if (connection.shouldCloseConnection.get()) {
throw new IOException("Call id=" + call.id + " on server "
+ addr + " aborted: connection is closing");
}
try { try {
synchronized (call) { synchronized (call) {
call.wait(Math.min(call.remainingTime(), 1000) + 1); call.wait(Math.min(call.remainingTime(), 1000) + 1);