HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)
(cherry picked from commit 682adc6ba9
)
This commit is contained in:
parent
b0ea50bb29
commit
7994ab3fa0
|
@ -386,7 +386,7 @@ public class Client {
|
||||||
private Socket socket = null; // connected socket
|
private Socket socket = null; // connected socket
|
||||||
private DataInputStream in;
|
private DataInputStream in;
|
||||||
private DataOutputStream out;
|
private DataOutputStream out;
|
||||||
private int rpcTimeout;
|
private final int rpcTimeout;
|
||||||
private int maxIdleTime; //connections will be culled if it was idle for
|
private int maxIdleTime; //connections will be culled if it was idle for
|
||||||
//maxIdleTime msecs
|
//maxIdleTime msecs
|
||||||
private final RetryPolicy connectionRetryPolicy;
|
private final RetryPolicy connectionRetryPolicy;
|
||||||
|
@ -394,8 +394,9 @@ public class Client {
|
||||||
private int maxRetriesOnSocketTimeouts;
|
private int maxRetriesOnSocketTimeouts;
|
||||||
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||||
private final boolean tcpLowLatency; // if T then use low-delay QoS
|
private final boolean tcpLowLatency; // if T then use low-delay QoS
|
||||||
private boolean doPing; //do we need to send ping message
|
private final boolean doPing; //do we need to send ping message
|
||||||
private int pingInterval; // how often sends ping to the server in msecs
|
private final int pingInterval; // how often sends ping to the server
|
||||||
|
private final int soTimeout; // used by ipc ping and rpc timeout
|
||||||
private ByteArrayOutputStream pingRequest; // ping message
|
private ByteArrayOutputStream pingRequest; // ping message
|
||||||
|
|
||||||
// currently active calls
|
// currently active calls
|
||||||
|
@ -434,6 +435,9 @@ public class Client {
|
||||||
pingHeader.writeDelimitedTo(pingRequest);
|
pingHeader.writeDelimitedTo(pingRequest);
|
||||||
}
|
}
|
||||||
this.pingInterval = remoteId.getPingInterval();
|
this.pingInterval = remoteId.getPingInterval();
|
||||||
|
this.soTimeout =
|
||||||
|
(rpcTimeout == 0 || (doPing && pingInterval < rpcTimeout))?
|
||||||
|
this.pingInterval : this.rpcTimeout;
|
||||||
this.serviceClass = serviceClass;
|
this.serviceClass = serviceClass;
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
|
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
|
||||||
|
@ -484,12 +488,12 @@ public class Client {
|
||||||
|
|
||||||
/* Process timeout exception
|
/* Process timeout exception
|
||||||
* if the connection is not going to be closed or
|
* if the connection is not going to be closed or
|
||||||
* is not configured to have a RPC timeout, send a ping.
|
* the RPC is not timed out yet, send a ping.
|
||||||
* (if rpcTimeout is not set to be 0, then RPC should timeout.
|
|
||||||
* otherwise, throw the timeout exception.
|
|
||||||
*/
|
*/
|
||||||
private void handleTimeout(SocketTimeoutException e) throws IOException {
|
private void handleTimeout(SocketTimeoutException e, int waiting)
|
||||||
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
|
throws IOException {
|
||||||
|
if (shouldCloseConnection.get() || !running.get() ||
|
||||||
|
(0 < rpcTimeout && rpcTimeout <= waiting)) {
|
||||||
throw e;
|
throw e;
|
||||||
} else {
|
} else {
|
||||||
sendPing();
|
sendPing();
|
||||||
|
@ -503,11 +507,13 @@ public class Client {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
|
int waiting = 0;
|
||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
return super.read();
|
return super.read();
|
||||||
} catch (SocketTimeoutException e) {
|
} catch (SocketTimeoutException e) {
|
||||||
handleTimeout(e);
|
waiting += soTimeout;
|
||||||
|
handleTimeout(e, waiting);
|
||||||
}
|
}
|
||||||
} while (true);
|
} while (true);
|
||||||
}
|
}
|
||||||
|
@ -520,11 +526,13 @@ public class Client {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int read(byte[] buf, int off, int len) throws IOException {
|
public int read(byte[] buf, int off, int len) throws IOException {
|
||||||
|
int waiting = 0;
|
||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
return super.read(buf, off, len);
|
return super.read(buf, off, len);
|
||||||
} catch (SocketTimeoutException e) {
|
} catch (SocketTimeoutException e) {
|
||||||
handleTimeout(e);
|
waiting += soTimeout;
|
||||||
|
handleTimeout(e, waiting);
|
||||||
}
|
}
|
||||||
} while (true);
|
} while (true);
|
||||||
}
|
}
|
||||||
|
@ -632,10 +640,7 @@ public class Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
NetUtils.connect(this.socket, server, connectionTimeout);
|
NetUtils.connect(this.socket, server, connectionTimeout);
|
||||||
if (rpcTimeout > 0) {
|
this.socket.setSoTimeout(soTimeout);
|
||||||
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
|
|
||||||
}
|
|
||||||
this.socket.setSoTimeout(pingInterval);
|
|
||||||
return;
|
return;
|
||||||
} catch (ConnectTimeoutException toe) {
|
} catch (ConnectTimeoutException toe) {
|
||||||
/* Check for an address change and update the local reference.
|
/* Check for an address change and update the local reference.
|
||||||
|
|
|
@ -1055,7 +1055,7 @@
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
<description>Send a ping to the server when timeout on reading the response,
|
<description>Send a ping to the server when timeout on reading the response,
|
||||||
if set to true. If no failure is detected, the client retries until at least
|
if set to true. If no failure is detected, the client retries until at least
|
||||||
a byte is read.
|
a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -1072,10 +1072,9 @@
|
||||||
<name>ipc.client.rpc-timeout.ms</name>
|
<name>ipc.client.rpc-timeout.ms</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
<description>Timeout on waiting response from server, in milliseconds.
|
<description>Timeout on waiting response from server, in milliseconds.
|
||||||
Currently this timeout works only when ipc.client.ping is set to true
|
If ipc.client.ping is set to true and this rpc-timeout is greater than
|
||||||
because it uses the same facilities with IPC ping.
|
the value of ipc.ping.interval, the effective value of the rpc-timeout is
|
||||||
The timeout overrides the ipc.ping.interval and client will throw exception
|
rounded up to multiple of ipc.ping.interval.
|
||||||
instead of sending ping when the interval is passed.
|
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
|
@ -1055,6 +1055,74 @@ public class TestRPC extends TestRpcBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test RPC timeout when ipc.client.ping is false.
|
||||||
|
*/
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testClientRpcTimeoutWithoutPing() throws Exception {
|
||||||
|
final Server server = new RPC.Builder(conf)
|
||||||
|
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||||
|
.build();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||||
|
final TestProtocol proxy =
|
||||||
|
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||||
|
NetUtils.getConnectAddress(server), conf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
proxy.sleep(3000);
|
||||||
|
fail("RPC should time out.");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
LOG.info("got expected timeout.", e);
|
||||||
|
} finally {
|
||||||
|
server.stop();
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test RPC timeout greater than ipc.ping.interval.
|
||||||
|
*/
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testClientRpcTimeoutGreaterThanPingInterval() throws Exception {
|
||||||
|
final Server server = new RPC.Builder(conf)
|
||||||
|
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||||
|
.build();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||||
|
final TestProtocol proxy =
|
||||||
|
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||||
|
NetUtils.getConnectAddress(server), conf);
|
||||||
|
|
||||||
|
// should not time out.
|
||||||
|
proxy.sleep(300);
|
||||||
|
|
||||||
|
// should not time out because effective rpc-timeout is
|
||||||
|
// multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
|
||||||
|
proxy.sleep(1300);
|
||||||
|
|
||||||
|
try {
|
||||||
|
proxy.sleep(2000);
|
||||||
|
fail("RPC should time out.");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
LOG.info("got expected timeout.", e);
|
||||||
|
} finally {
|
||||||
|
server.stop();
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
new TestRPC().testCallsInternal(conf);
|
new TestRPC().testCallsInternal(conf);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue