HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)

(cherry picked from commit 85ec5573eb)
This commit is contained in:
Masatake Iwasaki 2016-04-06 03:22:48 +09:00
parent 0907ce8c93
commit 2542e9bccf
4 changed files with 108 additions and 27 deletions

View File

@ -239,14 +239,33 @@ static final int getPingInterval(Configuration conf) {
* *
* @param conf Configuration * @param conf Configuration
* @return the timeout period in milliseconds. -1 if no timeout value is set * @return the timeout period in milliseconds. -1 if no timeout value is set
* @deprecated use {@link #getRpcTimeout(Configuration)} instead
*/ */
@Deprecated
final public static int getTimeout(Configuration conf) { final public static int getTimeout(Configuration conf) {
int timeout = getRpcTimeout(conf);
if (timeout > 0) {
return timeout;
}
if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT)) { CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT)) {
return getPingInterval(conf); return getPingInterval(conf);
} }
return -1; return -1;
} }
/**
* The time after which a RPC will timeout.
*
* @param conf Configuration
* @return the timeout period in milliseconds.
*/
public static final int getRpcTimeout(Configuration conf) {
int timeout =
conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY,
CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT);
return (timeout < 0) ? 0 : timeout;
}
/** /**
* set the connection timeout value in configuration * set the connection timeout value in configuration
* *
@ -386,7 +405,7 @@ private class Connection extends Thread {
private Socket socket = null; // connected socket private Socket socket = null; // connected socket
private DataInputStream in; private DataInputStream in;
private DataOutputStream out; private DataOutputStream out;
private int rpcTimeout; private final int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs //maxIdleTime msecs
private final RetryPolicy connectionRetryPolicy; private final RetryPolicy connectionRetryPolicy;
@ -394,8 +413,9 @@ private class Connection extends Thread {
private int maxRetriesOnSocketTimeouts; private int maxRetriesOnSocketTimeouts;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean tcpLowLatency; // if T then use low-delay QoS private final boolean tcpLowLatency; // if T then use low-delay QoS
private boolean doPing; //do we need to send ping message private final boolean doPing; //do we need to send ping message
private int pingInterval; // how often sends ping to the server in msecs private final int pingInterval; // how often sends ping to the server
private final int soTimeout; // used by ipc ping and rpc timeout
private ByteArrayOutputStream pingRequest; // ping message private ByteArrayOutputStream pingRequest; // ping message
// currently active calls // currently active calls
@ -434,6 +454,14 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
pingHeader.writeDelimitedTo(pingRequest); pingHeader.writeDelimitedTo(pingRequest);
} }
this.pingInterval = remoteId.getPingInterval(); this.pingInterval = remoteId.getPingInterval();
if (rpcTimeout > 0) {
// effective rpc timeout is rounded up to multiple of pingInterval
// if pingInterval < rpcTimeout.
this.soTimeout = (doPing && pingInterval < rpcTimeout) ?
pingInterval : rpcTimeout;
} else {
this.soTimeout = pingInterval;
}
this.serviceClass = serviceClass; this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms."); LOG.debug("The ping interval is " + this.pingInterval + " ms.");
@ -484,12 +512,12 @@ protected PingInputStream(InputStream in) {
/* Process timeout exception /* Process timeout exception
* if the connection is not going to be closed or * if the connection is not going to be closed or
* is not configured to have a RPC timeout, send a ping. * the RPC is not timed out yet, send a ping.
* (if rpcTimeout is not set to be 0, then RPC should timeout.
* otherwise, throw the timeout exception.
*/ */
private void handleTimeout(SocketTimeoutException e) throws IOException { private void handleTimeout(SocketTimeoutException e, int waiting)
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) { throws IOException {
if (shouldCloseConnection.get() || !running.get() ||
(0 < rpcTimeout && rpcTimeout <= waiting)) {
throw e; throw e;
} else { } else {
sendPing(); sendPing();
@ -503,11 +531,13 @@ private void handleTimeout(SocketTimeoutException e) throws IOException {
*/ */
@Override @Override
public int read() throws IOException { public int read() throws IOException {
int waiting = 0;
do { do {
try { try {
return super.read(); return super.read();
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
handleTimeout(e); waiting += soTimeout;
handleTimeout(e, waiting);
} }
} while (true); } while (true);
} }
@ -520,11 +550,13 @@ public int read() throws IOException {
*/ */
@Override @Override
public int read(byte[] buf, int off, int len) throws IOException { public int read(byte[] buf, int off, int len) throws IOException {
int waiting = 0;
do { do {
try { try {
return super.read(buf, off, len); return super.read(buf, off, len);
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
handleTimeout(e); waiting += soTimeout;
handleTimeout(e, waiting);
} }
} while (true); } while (true);
} }
@ -632,10 +664,7 @@ private synchronized void setupConnection() throws IOException {
} }
NetUtils.connect(this.socket, server, connectionTimeout); NetUtils.connect(this.socket, server, connectionTimeout);
if (rpcTimeout > 0) { this.socket.setSoTimeout(soTimeout);
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
}
this.socket.setSoTimeout(pingInterval);
return; return;
} catch (ConnectTimeoutException toe) { } catch (ConnectTimeoutException toe) {
/* Check for an address change and update the local reference. /* Check for an address change and update the local reference.

View File

@ -1055,7 +1055,7 @@
<value>true</value> <value>true</value>
<description>Send a ping to the server when timeout on reading the response, <description>Send a ping to the server when timeout on reading the response,
if set to true. If no failure is detected, the client retries until at least if set to true. If no failure is detected, the client retries until at least
a byte is read. a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
</description> </description>
</property> </property>
@ -1072,10 +1072,9 @@
<name>ipc.client.rpc-timeout.ms</name> <name>ipc.client.rpc-timeout.ms</name>
<value>0</value> <value>0</value>
<description>Timeout on waiting response from server, in milliseconds. <description>Timeout on waiting response from server, in milliseconds.
Currently this timeout works only when ipc.client.ping is set to true If ipc.client.ping is set to true and this rpc-timeout is greater than
because it uses the same facilities with IPC ping. the value of ipc.ping.interval, the effective value of the rpc-timeout is
The timeout overrides the ipc.ping.interval and client will throw exception rounded up to multiple of ipc.ping.interval.
instead of sending ping when the interval is passed.
</description> </description>
</property> </property>

View File

@ -1130,14 +1130,67 @@ public void testClientRpcTimeout() throws Exception {
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true); .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
server = setupTestServer(builder); server = setupTestServer(builder);
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
try { try {
proxy = getClient(addr, conf); // Test RPC timeout with default ipc.client.ping.
try {
Configuration c = new Configuration(conf);
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
proxy = getClient(addr, c);
proxy.sleep(null, newSleepRequest(3000)); proxy.sleep(null, newSleepRequest(3000));
fail("RPC should time out."); fail("RPC should time out.");
} catch (ServiceException e) { } catch (ServiceException e) {
assertTrue(e.getCause() instanceof SocketTimeoutException); assertTrue(e.getCause() instanceof SocketTimeoutException);
LOG.info("got expected timeout.", e); LOG.info("got expected timeout.", e);
}
// Test RPC timeout when ipc.client.ping is false.
try {
Configuration c = new Configuration(conf);
c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
proxy = getClient(addr, c);
proxy.sleep(null, newSleepRequest(3000));
fail("RPC should time out.");
} catch (ServiceException e) {
assertTrue(e.getCause() instanceof SocketTimeoutException);
LOG.info("got expected timeout.", e);
}
// Test negative timeout value.
try {
Configuration c = new Configuration(conf);
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, -1);
proxy = getClient(addr, c);
proxy.sleep(null, newSleepRequest(2000));
} catch (ServiceException e) {
LOG.info("got unexpected exception.", e);
fail("RPC should not time out.");
}
// Test RPC timeout greater than ipc.ping.interval.
try {
Configuration c = new Configuration(conf);
c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
c.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
proxy = getClient(addr, c);
try {
// should not time out because effective rpc-timeout is
// multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
proxy.sleep(null, newSleepRequest(1300));
} catch (ServiceException e) {
LOG.info("got unexpected exception.", e);
fail("RPC should not time out.");
}
proxy.sleep(null, newSleepRequest(2000));
fail("RPC should time out.");
} catch (ServiceException e) {
assertTrue(e.getCause() instanceof SocketTimeoutException);
LOG.info("got expected timeout.", e);
}
} finally { } finally {
stop(server, proxy); stop(server, proxy);
} }

View File

@ -141,7 +141,7 @@ public class DfsClientConf {
public DfsClientConf(Configuration conf) { public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout // The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf); hdfsTimeout = Client.getRpcTimeout(conf);
maxRetryAttempts = conf.getInt( maxRetryAttempts = conf.getInt(
Retry.MAX_ATTEMPTS_KEY, Retry.MAX_ATTEMPTS_KEY,