diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index e917489013e..3fcb331ce97 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -336,7 +336,8 @@ public class RpcClient { protected final static Map> tokenHandlers = - new HashMap>(); + new HashMap>(); static { tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector()); @@ -650,18 +651,21 @@ public class RpcClient { socket.getOutputStream().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } try { if (socket.getInputStream() != null) { socket.getInputStream().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } try { if (socket.getChannel() != null) { socket.getChannel().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } try { socket.close(); @@ -1159,18 +1163,18 @@ public class RpcClient { int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; IOUtils.skipFully(in, whatIsLeftToRead); + return; } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); - if (expectedCall) call.setException(re); + call.setException(re); if (isFatalConnectionException(exceptionResponse)) { markClosed(re); } } else { Message value = null; - // Call may be null because it may have timeout and been cleaned up on this side already - if (expectedCall && call.responseDefaultType != null) { + if (call.responseDefaultType != null) { Builder builder = call.responseDefaultType.newBuilderForType(); builder.mergeDelimitedFrom(in); value = builder.build(); @@ -1182,9 +1186,7 @@ public class RpcClient { IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); } - // it's possible that this call may have been cleaned up due to a RPC - // timeout, so check if it still exists before setting the value. - if (expectedCall) call.setResponse(value, cellBlockScanner); + call.setResponse(value, cellBlockScanner); } } catch (IOException e) { if (expectedCall) call.setException(e); @@ -1192,6 +1194,7 @@ public class RpcClient { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. + if (LOG.isTraceEnabled()) LOG.trace("ignored", e); } else { // Treat this as a fatal condition and close this connection markClosed(e); @@ -1487,7 +1490,6 @@ public class RpcClient { @Override public void run(Object parameter) { connection.callSender.remove(cts); - call.callComplete(); } }); if (pcrc.isCanceled()) { @@ -1660,7 +1662,7 @@ public class RpcClient { public int hashCode() { int hashcode = (address.hashCode() + PRIME * (PRIME * this.serviceName.hashCode() ^ - (ticket == null ? 0 : ticket.hashCode()) )); + (ticket == null ? 0 : ticket.hashCode()))); return hashcode; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 22f3d85bf38..9897914608e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -681,6 +681,7 @@ public class RpcServer implements RpcServerInterface { doAccept(key); } } catch (IOException ignored) { + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } key = null; } @@ -718,7 +719,9 @@ public class RpcServer implements RpcServerInterface { try { acceptChannel.close(); selector.close(); - } catch (IOException ignored) { } + } catch (IOException ignored) { + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); + } selector= null; acceptChannel= null; @@ -866,22 +869,28 @@ public class RpcServer implements RpcServerInterface { /** * Take the list of the connections that want to write, and register them - * in the selector. + * in the selector. */ - private void registerWrites(){ + private void registerWrites() { Iterator it = writingCons.iterator(); - while (it.hasNext()){ + while (it.hasNext()) { Connection c = it.next(); it.remove(); SelectionKey sk = c.channel.keyFor(writeSelector); - if (sk == null){ - try { - c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); - } catch (ClosedChannelException e) { - // ignore: the client went away. + try { + if (sk == null) { + try { + c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); + } catch (ClosedChannelException e) { + // ignore: the client went away. + if (LOG.isTraceEnabled()) LOG.trace("ignored", e); + } + } else { + sk.interestOps(SelectionKey.OP_WRITE); } - } else { - sk.interestOps(SelectionKey.OP_WRITE); + } catch (CancelledKeyException e) { + // ignore: the client went away. + if (LOG.isTraceEnabled()) LOG.trace("ignored", e); } } } @@ -1044,15 +1053,17 @@ public class RpcServer implements RpcServerInterface { /** * Process all the responses for this connection * - * @return true if all the calls were processed or that someone else is doing it. false if there - * is still some work to do. In this case, we expect the caller to delay us. + * @return true if all the calls were processed or that someone else is doing it. + * false if there * is still some work to do. In this case, we expect the caller to + * delay us. * @throws IOException */ private boolean processAllResponses(final Connection connection) throws IOException { // We want only one writer on the channel for a connection at a time. connection.responseWriteLock.lock(); try { - for (int i = 0; i < 20; i++) { // protection if some handlers manage to need all the responder + for (int i = 0; i < 20; i++) { + // protection if some handlers manage to need all the responder Call call = connection.responseQueue.pollFirst(); if (call == null) { return true; @@ -1275,8 +1286,7 @@ public class RpcServer implements RpcServerInterface { secretManager, this)); break; default: - UserGroupInformation current = UserGroupInformation - .getCurrentUser(); + UserGroupInformation current = UserGroupInformation.getCurrentUser(); String fullName = current.getUserName(); if (LOG.isDebugEnabled()) { LOG.debug("Kerberos principal name is " + fullName);