From acafc950d9347769c3729d571121b3525c6d5eb2 Mon Sep 17 00:00:00 2001 From: Masatake Iwasaki Date: Fri, 11 Mar 2016 15:12:22 +0900 Subject: [PATCH] HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims) --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/ipc/Client.java | 35 ++++++---- .../src/main/resources/core-default.xml | 9 ++- .../java/org/apache/hadoop/ipc/TestRPC.java | 68 +++++++++++++++++++ 4 files changed, 94 insertions(+), 20 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a2381fa6379..2d6107c853d 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -902,6 +902,8 @@ Release 2.6.5 - UNRELEASED HADOOP-12789. log classpath of ApplicationClassLoader at INFO level (Sangjin Lee via mingma) + HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 42b99345bd6..4c5da8e2c44 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -383,15 +383,16 @@ private class Connection extends Thread { private Socket socket = null; // connected socket private DataInputStream in; private DataOutputStream out; - private int rpcTimeout; + private final int rpcTimeout; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs private final RetryPolicy connectionRetryPolicy; private final int maxRetriesOnSasl; private int maxRetriesOnSocketTimeouts; - private boolean tcpNoDelay; // if T then disable Nagle's Algorithm - private boolean doPing; //do we need to send ping message - private int pingInterval; // how often sends ping to the server in msecs + private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + private final boolean doPing; //do we need to send ping message + private final int pingInterval; // how often sends ping to the server + private final int soTimeout; // used by ipc ping and rpc timeout private ByteArrayOutputStream pingRequest; // ping message // currently active calls @@ -429,6 +430,9 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException { pingHeader.writeDelimitedTo(pingRequest); } this.pingInterval = remoteId.getPingInterval(); + this.soTimeout = + (rpcTimeout == 0 || (doPing && pingInterval < rpcTimeout))? + this.pingInterval : this.rpcTimeout; this.serviceClass = serviceClass; if (LOG.isDebugEnabled()) { LOG.debug("The ping interval is " + this.pingInterval + " ms."); @@ -479,12 +483,12 @@ protected PingInputStream(InputStream in) { /* Process timeout exception * if the connection is not going to be closed or - * is not configured to have a RPC timeout, send a ping. - * (if rpcTimeout is not set to be 0, then RPC should timeout. - * otherwise, throw the timeout exception. + * the RPC is not timed out yet, send a ping. */ - private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) { + private void handleTimeout(SocketTimeoutException e, int waiting) + throws IOException { + if (shouldCloseConnection.get() || !running.get() || + (0 < rpcTimeout && rpcTimeout <= waiting)) { throw e; } else { sendPing(); @@ -498,11 +502,13 @@ private void handleTimeout(SocketTimeoutException e) throws IOException { */ @Override public int read() throws IOException { + int waiting = 0; do { try { return super.read(); } catch (SocketTimeoutException e) { - handleTimeout(e); + waiting += soTimeout; + handleTimeout(e, waiting); } } while (true); } @@ -515,11 +521,13 @@ public int read() throws IOException { */ @Override public int read(byte[] buf, int off, int len) throws IOException { + int waiting = 0; do { try { return super.read(buf, off, len); } catch (SocketTimeoutException e) { - handleTimeout(e); + waiting += soTimeout; + handleTimeout(e, waiting); } } while (true); } @@ -612,10 +620,7 @@ private synchronized void setupConnection() throws IOException { } NetUtils.connect(this.socket, server, connectionTimeout); - if (rpcTimeout > 0) { - pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval - } - this.socket.setSoTimeout(pingInterval); + this.socket.setSoTimeout(soTimeout); return; } catch (ConnectTimeoutException toe) { /* Check for an address change and update the local reference. diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 96bac4d4aff..0f109b83455 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -980,7 +980,7 @@ for ldap providers in the same way as above does. true Send a ping to the server when timeout on reading the response, if set to true. If no failure is detected, the client retries until at least - a byte is read. + a byte is read or the time given by ipc.client.rpc-timeout.ms is passed. @@ -997,10 +997,9 @@ for ldap providers in the same way as above does. ipc.client.rpc-timeout.ms 0 Timeout on waiting response from server, in milliseconds. - Currently this timeout works only when ipc.client.ping is set to true - because it uses the same facilities with IPC ping. - The timeout overrides the ipc.ping.interval and client will throw exception - instead of sending ping when the interval is passed. + If ipc.client.ping is set to true and this rpc-timeout is greater than + the value of ipc.ping.interval, the effective value of the rpc-timeout is + rounded up to multiple of ipc.ping.interval. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index b3e0553190d..60eff8bc3e4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1107,6 +1107,74 @@ public void testClientRpcTimeout() throws Exception { } } + /** + * Test RPC timeout when ipc.client.ping is false. + */ + @Test(timeout=30000) + public void testClientRpcTimeoutWithoutPing() throws Exception { + final Server server = new RPC.Builder(conf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) + .build(); + server.start(); + + final Configuration conf = new Configuration(); + conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); + final TestProtocol proxy = + RPC.getProxy(TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + + try { + proxy.sleep(3000); + fail("RPC should time out."); + } catch (SocketTimeoutException e) { + LOG.info("got expected timeout.", e); + } finally { + server.stop(); + RPC.stopProxy(proxy); + } + } + + /** + * Test RPC timeout greater than ipc.ping.interval. + */ + @Test(timeout=30000) + public void testClientRpcTimeoutGreaterThanPingInterval() throws Exception { + final Server server = new RPC.Builder(conf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) + .build(); + server.start(); + + final Configuration conf = new Configuration(); + conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); + conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); + final TestProtocol proxy = + RPC.getProxy(TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + + // should not time out. + proxy.sleep(300); + + // should not time out because effective rpc-timeout is + // multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1)) + proxy.sleep(1300); + + try { + proxy.sleep(2000); + fail("RPC should time out."); + } catch (SocketTimeoutException e) { + LOG.info("got expected timeout.", e); + } finally { + server.stop(); + RPC.stopProxy(proxy); + } + } + public static void main(String[] args) throws IOException { new TestRPC().testCallsInternal(conf);