HBASE-10637 rpcClient: Setup the iostreams when writing

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1574110 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-03-04 14:33:57 +00:00
parent 50f857944e
commit ddfc421a7d
2 changed files with 21 additions and 21 deletions

View File

@ -877,10 +877,15 @@ public class RpcClient {
} }
protected synchronized void setupIOstreams() throws IOException { protected synchronized void setupIOstreams() throws IOException {
if (socket != null || shouldCloseConnection.get()) { if (socket != null) {
// The connection is already available. Perfect.
return; return;
} }
if (shouldCloseConnection.get()){
throw new IOException("This connection is closing");
}
if (failedServers.isFailedServer(remoteId.getAddress())) { if (failedServers.isFailedServer(remoteId.getAddress())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Not trying to connect to " + server + LOG.debug("Not trying to connect to " + server +
@ -926,6 +931,7 @@ public class RpcClient {
} }
}); });
} catch (Exception ex) { } catch (Exception ex) {
ExceptionUtil.rethrowIfInterrupt(ex);
if (rand == null) { if (rand == null) {
rand = new Random(); rand = new Random();
} }
@ -952,12 +958,14 @@ public class RpcClient {
return; return;
} }
} catch (Throwable t) { } catch (Throwable t) {
failedServers.addToFailedServers(remoteId.address); IOException e = ExceptionUtil.asInterrupt(t);
IOException e; if (e == null) {
if (t instanceof IOException) { failedServers.addToFailedServers(remoteId.address);
e = (IOException)t; if (t instanceof IOException) {
} else { e = (IOException) t;
e = new IOException("Could not set up IO Streams", t); } else {
e = new IOException("Could not set up IO Streams to " + server, t);
}
} }
markClosed(e); markClosed(e);
close(); close();
@ -1064,6 +1072,8 @@ public class RpcClient {
if (priority != 0) builder.setPriority(priority); if (priority != 0) builder.setPriority(priority);
RequestHeader header = builder.build(); RequestHeader header = builder.build();
setupIOstreams();
// Now we're going to write the call. We take the lock, then check that the connection // Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't // is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection. // know where we stand, we have to close the connection.
@ -1445,8 +1455,7 @@ public class RpcClient {
Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority) Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Call call = new Call(md, param, cells, returnType, callTimeout); Call call = new Call(md, param, cells, returnType, callTimeout);
Connection connection = Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor);
getConnection(ticket, call, addr, this.codec, this.compressor);
CallFuture cts = null; CallFuture cts = null;
if (connection.callSender != null){ if (connection.callSender != null){
@ -1553,15 +1562,6 @@ public class RpcClient {
} }
} }
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
//Moreover, if the connection is currently created, there will be many threads
// waiting here; as setupIOstreams is synchronized. If the connection fails with a
// timeout, they will all fail simultaneously. This is checked in setupIOstreams.
connection.setupIOstreams();
return connection; return connection;
} }

View File

@ -28,7 +28,7 @@ import java.nio.channels.ClosedByInterruptException;
* - InterruptedException * - InterruptedException
* - InterruptedIOException (inherits IOException); used in IO * - InterruptedIOException (inherits IOException); used in IO
* - ClosedByInterruptException (inherits IOException) * - ClosedByInterruptException (inherits IOException)
* , - SocketTimeoutException inherits InterruptedIOException but is not a real * - SocketTimeoutException inherits InterruptedIOException but is not a real
* interruption, so we have to distinguish the case. This pattern is unfortunately common. * interruption, so we have to distinguish the case. This pattern is unfortunately common.
*/ */
public class ExceptionUtil { public class ExceptionUtil {
@ -39,7 +39,7 @@ public class ExceptionUtil {
public static boolean isInterrupt(Throwable t) { public static boolean isInterrupt(Throwable t) {
if (t instanceof InterruptedException) return true; if (t instanceof InterruptedException) return true;
if (t instanceof SocketTimeoutException) return false; if (t instanceof SocketTimeoutException) return false;
return (t instanceof InterruptedIOException); return (t instanceof InterruptedIOException || t instanceof ClosedByInterruptException);
} }
/** /**
@ -58,7 +58,7 @@ public class ExceptionUtil {
if (t instanceof InterruptedIOException) return (InterruptedIOException) t; if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
if (t instanceof InterruptedException) { if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) {
InterruptedIOException iie = new InterruptedIOException(); InterruptedIOException iie = new InterruptedIOException();
iie.initCause(t); iie.initCause(t);
return iie; return iie;