From ddfc421a7dc3180e3700c092eab03a770f2498ee Mon Sep 17 00:00:00 2001 From: nkeywal Date: Tue, 4 Mar 2014 14:33:57 +0000 Subject: [PATCH] HBASE-10637 rpcClient: Setup the iostreams when writing git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1574110 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/ipc/RpcClient.java | 36 +++++++++---------- .../hadoop/hbase/util/ExceptionUtil.java | 6 ++-- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 1e41c8b5ef7..d20f376d9cd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -877,10 +877,15 @@ public class RpcClient { } protected synchronized void setupIOstreams() throws IOException { - if (socket != null || shouldCloseConnection.get()) { + if (socket != null) { + // The connection is already available. Perfect. return; } + if (shouldCloseConnection.get()){ + throw new IOException("This connection is closing"); + } + if (failedServers.isFailedServer(remoteId.getAddress())) { if (LOG.isDebugEnabled()) { LOG.debug("Not trying to connect to " + server + @@ -926,6 +931,7 @@ public class RpcClient { } }); } catch (Exception ex) { + ExceptionUtil.rethrowIfInterrupt(ex); if (rand == null) { rand = new Random(); } @@ -952,12 +958,14 @@ public class RpcClient { return; } } catch (Throwable t) { - failedServers.addToFailedServers(remoteId.address); - IOException e; - if (t instanceof IOException) { - e = (IOException)t; - } else { - e = new IOException("Could not set up IO Streams", t); + IOException e = ExceptionUtil.asInterrupt(t); + if (e == null) { + failedServers.addToFailedServers(remoteId.address); + if (t instanceof IOException) { + e = (IOException) t; + } else { + e = new IOException("Could not set up IO Streams to " + server, t); + } } markClosed(e); close(); @@ -1064,6 +1072,8 @@ public class RpcClient { if (priority != 0) builder.setPriority(priority); RequestHeader header = builder.build(); + setupIOstreams(); + // Now we're going to write the call. We take the lock, then check that the connection // is still valid, and, if so we do the write to the socket. If the write fails, we don't // know where we stand, we have to close the connection. @@ -1445,8 +1455,7 @@ public class RpcClient { Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) throws IOException, InterruptedException { Call call = new Call(md, param, cells, returnType, callTimeout); - Connection connection = - getConnection(ticket, call, addr, this.codec, this.compressor); + Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor); CallFuture cts = null; if (connection.callSender != null){ @@ -1553,15 +1562,6 @@ public class RpcClient { } } - //we don't invoke the method below inside "synchronized (connections)" - //block above. The reason for that is if the server happens to be slow, - //it will take longer to establish a connection and that will slow the - //entire system down. - //Moreover, if the connection is currently created, there will be many threads - // waiting here; as setupIOstreams is synchronized. If the connection fails with a - // timeout, they will all fail simultaneously. This is checked in setupIOstreams. - connection.setupIOstreams(); - return connection; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java index 65bf9c38c0d..35bb2599537 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java @@ -28,7 +28,7 @@ import java.nio.channels.ClosedByInterruptException; * - InterruptedException * - InterruptedIOException (inherits IOException); used in IO * - ClosedByInterruptException (inherits IOException) - * , - SocketTimeoutException inherits InterruptedIOException but is not a real + * - SocketTimeoutException inherits InterruptedIOException but is not a real * interruption, so we have to distinguish the case. This pattern is unfortunately common. */ public class ExceptionUtil { @@ -39,7 +39,7 @@ public class ExceptionUtil { public static boolean isInterrupt(Throwable t) { if (t instanceof InterruptedException) return true; if (t instanceof SocketTimeoutException) return false; - return (t instanceof InterruptedIOException); + return (t instanceof InterruptedIOException || t instanceof ClosedByInterruptException); } /** @@ -58,7 +58,7 @@ public class ExceptionUtil { if (t instanceof InterruptedIOException) return (InterruptedIOException) t; - if (t instanceof InterruptedException) { + if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) { InterruptedIOException iie = new InterruptedIOException(); iie.initCause(t); return iie;