HBASE-11835 Wrong managenement of non expected calls in the client (Nicolas Liochon)

This commit is contained in:
stack 2014-10-30 12:41:54 -07:00
parent f20fac41df
commit 9f4b6ac06c
2 changed files with 37 additions and 25 deletions

View File

@ -338,7 +338,8 @@ public class RpcClient {
protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>>();
new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
TokenSelector<? extends TokenIdentifier>>();
static {
tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
new AuthenticationTokenSelector());
@ -652,18 +653,21 @@ public class RpcClient {
socket.getOutputStream().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
try {
if (socket.getInputStream() != null) {
socket.getInputStream().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
try {
if (socket.getChannel() != null) {
socket.getChannel().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
try {
socket.close();
@ -1161,18 +1165,18 @@ public class RpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar;
IOUtils.skipFully(in, whatIsLeftToRead);
return;
}
if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse);
if (expectedCall) call.setException(re);
call.setException(re);
if (isFatalConnectionException(exceptionResponse)) {
markClosed(re);
}
} else {
Message value = null;
// Call may be null because it may have timeout and been cleaned up on this side already
if (expectedCall && call.responseDefaultType != null) {
if (call.responseDefaultType != null) {
Builder builder = call.responseDefaultType.newBuilderForType();
builder.mergeDelimitedFrom(in);
value = builder.build();
@ -1184,9 +1188,7 @@ public class RpcClient {
IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
}
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
if (expectedCall) call.setResponse(value, cellBlockScanner);
call.setResponse(value, cellBlockScanner);
}
} catch (IOException e) {
if (expectedCall) call.setException(e);
@ -1194,6 +1196,7 @@ public class RpcClient {
// Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
} else {
// Treat this as a fatal condition and close this connection
markClosed(e);
@ -1489,7 +1492,6 @@ public class RpcClient {
@Override
public void run(Object parameter) {
connection.callSender.remove(cts);
call.callComplete();
}
});
if (pcrc.isCanceled()) {
@ -1662,7 +1664,7 @@ public class RpcClient {
public int hashCode() {
int hashcode = (address.hashCode() +
PRIME * (PRIME * this.serviceName.hashCode() ^
(ticket == null ? 0 : ticket.hashCode()) ));
(ticket == null ? 0 : ticket.hashCode())));
return hashcode;
}
}

View File

@ -685,6 +685,7 @@ public class RpcServer implements RpcServerInterface {
doAccept(key);
}
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
key = null;
}
@ -722,7 +723,9 @@ public class RpcServer implements RpcServerInterface {
try {
acceptChannel.close();
selector.close();
} catch (IOException ignored) { }
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
selector= null;
acceptChannel= null;
@ -870,22 +873,28 @@ public class RpcServer implements RpcServerInterface {
/**
* Take the list of the connections that want to write, and register them
* in the selector.
* in the selector.
*/
private void registerWrites(){
private void registerWrites() {
Iterator<Connection> it = writingCons.iterator();
while (it.hasNext()){
while (it.hasNext()) {
Connection c = it.next();
it.remove();
SelectionKey sk = c.channel.keyFor(writeSelector);
if (sk == null){
try {
c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
} catch (ClosedChannelException e) {
// ignore: the client went away.
try {
if (sk == null) {
try {
c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
} catch (ClosedChannelException e) {
// ignore: the client went away.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
}
} else {
sk.interestOps(SelectionKey.OP_WRITE);
}
} else {
sk.interestOps(SelectionKey.OP_WRITE);
} catch (CancelledKeyException e) {
// ignore: the client went away.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
}
}
}
@ -1048,15 +1057,17 @@ public class RpcServer implements RpcServerInterface {
/**
* Process all the responses for this connection
*
* @return true if all the calls were processed or that someone else is doing it. false if there
* is still some work to do. In this case, we expect the caller to delay us.
* @return true if all the calls were processed or that someone else is doing it.
* false if there * is still some work to do. In this case, we expect the caller to
* delay us.
* @throws IOException
*/
private boolean processAllResponses(final Connection connection) throws IOException {
// We want only one writer on the channel for a connection at a time.
connection.responseWriteLock.lock();
try {
for (int i = 0; i < 20; i++) { // protection if some handlers manage to need all the responder
for (int i = 0; i < 20; i++) {
// protection if some handlers manage to need all the responder
Call call = connection.responseQueue.pollFirst();
if (call == null) {
return true;
@ -1279,8 +1290,7 @@ public class RpcServer implements RpcServerInterface {
secretManager, this));
break;
default:
UserGroupInformation current = UserGroupInformation
.getCurrentUser();
UserGroupInformation current = UserGroupInformation.getCurrentUser();
String fullName = current.getUserName();
if (LOG.isDebugEnabled()) {
LOG.debug("Kerberos principal name is " + fullName);