diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 093fe1e8e99..efdb3f530e9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -239,14 +239,33 @@ public class Client {
*
* @param conf Configuration
* @return the timeout period in milliseconds. -1 if no timeout value is set
+ * @deprecated use {@link #getRpcTimeout(Configuration)} instead
*/
+ @Deprecated
final public static int getTimeout(Configuration conf) {
+ int timeout = getRpcTimeout(conf);
+ if (timeout > 0) {
+ return timeout;
+ }
if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT)) {
return getPingInterval(conf);
}
return -1;
}
+
+ /**
+ * The time after which a RPC will timeout.
+ *
+ * @param conf Configuration
+ * @return the timeout period in milliseconds.
+ */
+ public static final int getRpcTimeout(Configuration conf) {
+ int timeout =
+ conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT);
+ return (timeout < 0) ? 0 : timeout;
+ }
/**
* set the connection timeout value in configuration
*
@@ -386,7 +405,7 @@ public class Client {
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
- private int rpcTimeout;
+ private final int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private final RetryPolicy connectionRetryPolicy;
@@ -394,8 +413,9 @@ public class Client {
private int maxRetriesOnSocketTimeouts;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean tcpLowLatency; // if T then use low-delay QoS
- private boolean doPing; //do we need to send ping message
- private int pingInterval; // how often sends ping to the server in msecs
+ private final boolean doPing; //do we need to send ping message
+ private final int pingInterval; // how often sends ping to the server
+ private final int soTimeout; // used by ipc ping and rpc timeout
private ByteArrayOutputStream pingRequest; // ping message
// currently active calls
@@ -434,6 +454,14 @@ public class Client {
pingHeader.writeDelimitedTo(pingRequest);
}
this.pingInterval = remoteId.getPingInterval();
+ if (rpcTimeout > 0) {
+ // effective rpc timeout is rounded up to multiple of pingInterval
+ // if pingInterval < rpcTimeout.
+ this.soTimeout = (doPing && pingInterval < rpcTimeout) ?
+ pingInterval : rpcTimeout;
+ } else {
+ this.soTimeout = pingInterval;
+ }
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
@@ -484,12 +512,12 @@ public class Client {
/* Process timeout exception
* if the connection is not going to be closed or
- * is not configured to have a RPC timeout, send a ping.
- * (if rpcTimeout is not set to be 0, then RPC should timeout.
- * otherwise, throw the timeout exception.
+ * the RPC is not timed out yet, send a ping.
*/
- private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
+ private void handleTimeout(SocketTimeoutException e, int waiting)
+ throws IOException {
+ if (shouldCloseConnection.get() || !running.get() ||
+ (0 < rpcTimeout && rpcTimeout <= waiting)) {
throw e;
} else {
sendPing();
@@ -503,11 +531,13 @@ public class Client {
*/
@Override
public int read() throws IOException {
+ int waiting = 0;
do {
try {
return super.read();
} catch (SocketTimeoutException e) {
- handleTimeout(e);
+ waiting += soTimeout;
+ handleTimeout(e, waiting);
}
} while (true);
}
@@ -520,11 +550,13 @@ public class Client {
*/
@Override
public int read(byte[] buf, int off, int len) throws IOException {
+ int waiting = 0;
do {
try {
return super.read(buf, off, len);
} catch (SocketTimeoutException e) {
- handleTimeout(e);
+ waiting += soTimeout;
+ handleTimeout(e, waiting);
}
} while (true);
}
@@ -632,10 +664,7 @@ public class Client {
}
NetUtils.connect(this.socket, server, connectionTimeout);
- if (rpcTimeout > 0) {
- pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
- }
- this.socket.setSoTimeout(pingInterval);
+ this.socket.setSoTimeout(soTimeout);
return;
} catch (ConnectTimeoutException toe) {
/* Check for an address change and update the local reference.
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 28e0a8a5603..f3dbc5b6d4b 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1055,7 +1055,7 @@
true
Send a ping to the server when timeout on reading the response,
if set to true. If no failure is detected, the client retries until at least
- a byte is read.
+ a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
@@ -1072,10 +1072,9 @@
ipc.client.rpc-timeout.ms
0
Timeout on waiting response from server, in milliseconds.
- Currently this timeout works only when ipc.client.ping is set to true
- because it uses the same facilities with IPC ping.
- The timeout overrides the ipc.ping.interval and client will throw exception
- instead of sending ping when the interval is passed.
+ If ipc.client.ping is set to true and this rpc-timeout is greater than
+ the value of ipc.ping.interval, the effective value of the rpc-timeout is
+ rounded up to multiple of ipc.ping.interval.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 531db80a932..d600e014f81 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1130,14 +1130,67 @@ public class TestRPC extends TestRpcBase {
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
server = setupTestServer(builder);
- conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
try {
- proxy = getClient(addr, conf);
- proxy.sleep(null, newSleepRequest(3000));
- fail("RPC should time out.");
- } catch (ServiceException e) {
- assertTrue(e.getCause() instanceof SocketTimeoutException);
- LOG.info("got expected timeout.", e);
+ // Test RPC timeout with default ipc.client.ping.
+ try {
+ Configuration c = new Configuration(conf);
+ c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
+ proxy = getClient(addr, c);
+ proxy.sleep(null, newSleepRequest(3000));
+ fail("RPC should time out.");
+ } catch (ServiceException e) {
+ assertTrue(e.getCause() instanceof SocketTimeoutException);
+ LOG.info("got expected timeout.", e);
+ }
+
+ // Test RPC timeout when ipc.client.ping is false.
+ try {
+ Configuration c = new Configuration(conf);
+ c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
+ c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
+ proxy = getClient(addr, c);
+ proxy.sleep(null, newSleepRequest(3000));
+ fail("RPC should time out.");
+ } catch (ServiceException e) {
+ assertTrue(e.getCause() instanceof SocketTimeoutException);
+ LOG.info("got expected timeout.", e);
+ }
+
+ // Test negative timeout value.
+ try {
+ Configuration c = new Configuration(conf);
+ c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, -1);
+ proxy = getClient(addr, c);
+ proxy.sleep(null, newSleepRequest(2000));
+ } catch (ServiceException e) {
+ LOG.info("got unexpected exception.", e);
+ fail("RPC should not time out.");
+ }
+
+ // Test RPC timeout greater than ipc.ping.interval.
+ try {
+ Configuration c = new Configuration(conf);
+ c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
+ c.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
+ c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
+ proxy = getClient(addr, c);
+
+ try {
+ // should not time out because effective rpc-timeout is
+ // multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
+ proxy.sleep(null, newSleepRequest(1300));
+ } catch (ServiceException e) {
+ LOG.info("got unexpected exception.", e);
+ fail("RPC should not time out.");
+ }
+
+ proxy.sleep(null, newSleepRequest(2000));
+ fail("RPC should time out.");
+ } catch (ServiceException e) {
+ assertTrue(e.getCause() instanceof SocketTimeoutException);
+ LOG.info("got expected timeout.", e);
+ }
+
} finally {
stop(server, proxy);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index ed6cd23d804..8848f86dda8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -141,7 +141,7 @@ public class DfsClientConf {
public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
- hdfsTimeout = Client.getTimeout(conf);
+ hdfsTimeout = Client.getRpcTimeout(conf);
maxRetryAttempts = conf.getInt(
Retry.MAX_ATTEMPTS_KEY,