HBASE-11835 Wrong managenement of non expected calls in the client (Nicolas Liochon)

This commit is contained in:
stack 2014-10-30 12:41:54 -07:00
parent f0091a9031
commit 29d486ff4e
2 changed files with 37 additions and 25 deletions

View File

@ -336,7 +336,8 @@ public class RpcClient {
protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>>();
new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
TokenSelector<? extends TokenIdentifier>>();
static {
tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
new AuthenticationTokenSelector());
@ -650,18 +651,21 @@ public class RpcClient {
socket.getOutputStream().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
try {
if (socket.getInputStream() != null) {
socket.getInputStream().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
try {
if (socket.getChannel() != null) {
socket.getChannel().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
try {
socket.close();
@ -1159,18 +1163,18 @@ public class RpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar;
IOUtils.skipFully(in, whatIsLeftToRead);
return;
}
if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse);
if (expectedCall) call.setException(re);
call.setException(re);
if (isFatalConnectionException(exceptionResponse)) {
markClosed(re);
}
} else {
Message value = null;
// Call may be null because it may have timeout and been cleaned up on this side already
if (expectedCall && call.responseDefaultType != null) {
if (call.responseDefaultType != null) {
Builder builder = call.responseDefaultType.newBuilderForType();
builder.mergeDelimitedFrom(in);
value = builder.build();
@ -1182,9 +1186,7 @@ public class RpcClient {
IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
}
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
if (expectedCall) call.setResponse(value, cellBlockScanner);
call.setResponse(value, cellBlockScanner);
}
} catch (IOException e) {
if (expectedCall) call.setException(e);
@ -1192,6 +1194,7 @@ public class RpcClient {
// Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
} else {
// Treat this as a fatal condition and close this connection
markClosed(e);
@ -1487,7 +1490,6 @@ public class RpcClient {
@Override
public void run(Object parameter) {
connection.callSender.remove(cts);
call.callComplete();
}
});
if (pcrc.isCanceled()) {

View File

@ -681,6 +681,7 @@ public class RpcServer implements RpcServerInterface {
doAccept(key);
}
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
key = null;
}
@ -718,7 +719,9 @@ public class RpcServer implements RpcServerInterface {
try {
acceptChannel.close();
selector.close();
} catch (IOException ignored) { }
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
selector= null;
acceptChannel= null;
@ -874,15 +877,21 @@ public class RpcServer implements RpcServerInterface {
Connection c = it.next();
it.remove();
SelectionKey sk = c.channel.keyFor(writeSelector);
try {
if (sk == null) {
try {
c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
} catch (ClosedChannelException e) {
// ignore: the client went away.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
}
} else {
sk.interestOps(SelectionKey.OP_WRITE);
}
} catch (CancelledKeyException e) {
// ignore: the client went away.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
}
}
}
@ -1044,15 +1053,17 @@ public class RpcServer implements RpcServerInterface {
/**
* Process all the responses for this connection
*
* @return true if all the calls were processed or that someone else is doing it. false if there
* is still some work to do. In this case, we expect the caller to delay us.
* @return true if all the calls were processed or that someone else is doing it.
* false if there * is still some work to do. In this case, we expect the caller to
* delay us.
* @throws IOException
*/
private boolean processAllResponses(final Connection connection) throws IOException {
// We want only one writer on the channel for a connection at a time.
connection.responseWriteLock.lock();
try {
for (int i = 0; i < 20; i++) { // protection if some handlers manage to need all the responder
for (int i = 0; i < 20; i++) {
// protection if some handlers manage to need all the responder
Call call = connection.responseQueue.pollFirst();
if (call == null) {
return true;
@ -1275,8 +1286,7 @@ public class RpcServer implements RpcServerInterface {
secretManager, this));
break;
default:
UserGroupInformation current = UserGroupInformation
.getCurrentUser();
UserGroupInformation current = UserGroupInformation.getCurrentUser();
String fullName = current.getUserName();
if (LOG.isDebugEnabled()) {
LOG.debug("Kerberos principal name is " + fullName);