HADOOP-11252. RPC client does not time out by default. Contributed by Wilfred Spiegelenburg and Masatake Iwasaki.

(cherry picked from commit 64ae85fd2e)
This commit is contained in:
Akira Ajisaka 2016-01-04 14:31:22 +09:00
parent ae25e283cf
commit d98cfe611f
7 changed files with 87 additions and 8 deletions

View File

@ -1768,6 +1768,9 @@ Release 2.6.4 - UNRELEASED
BUG FIXES
HADOOP-11252. RPC client does not time out by default.
(Wilfred Spiegelenburg and Masatake Iwasaki via aajisaka)
Release 2.6.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -52,6 +52,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String IPC_CLIENT_PING_KEY = "ipc.client.ping";
/** Default value of IPC_CLIENT_PING_KEY */
public static final boolean IPC_CLIENT_PING_DEFAULT = true;
/** Timeout value for RPC client on waiting for response. */
public static final String IPC_CLIENT_RPC_TIMEOUT_KEY =
"ipc.client.rpc-timeout.ms";
/** Default value for IPC_CLIENT_RPC_TIMEOUT_KEY. */
public static final int IPC_CLIENT_RPC_TIMEOUT_DEFAULT = 0;
/** Responses larger than this will be logged */
public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
"ipc.server.max.response.size";

View File

@ -215,7 +215,7 @@ public class Client {
* @param conf Configuration
* @param pingInterval the ping interval
*/
final public static void setPingInterval(Configuration conf, int pingInterval) {
static final void setPingInterval(Configuration conf, int pingInterval) {
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
}
@ -226,7 +226,7 @@ public class Client {
* @param conf Configuration
* @return the ping interval
*/
final public static int getPingInterval(Configuration conf) {
static final int getPingInterval(Configuration conf) {
return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
}

View File

@ -40,6 +40,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
@ -343,7 +344,8 @@ public class RPC {
long clientVersion,
InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout);
return waitForProtocolProxy(protocol, clientVersion, addr, conf,
getRpcTimeout(conf), null, connTimeout);
}
/**
@ -487,8 +489,8 @@ public class RPC {
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
protocol, clientVersion, addr, ticket, conf, factory, 0, null);
return getProtocolProxy(protocol, clientVersion, addr, ticket, conf,
factory, getRpcTimeout(conf), null);
}
/**
@ -682,6 +684,17 @@ public class RPC {
+ "does not provide closeable invocation handler "
+ proxy.getClass());
}
/**
* Get the RPC time from configuration;
* If not set in the configuration, return the default value.
*
* @param conf Configuration
* @return the RPC timeout (ms)
*/
public static int getRpcTimeout(Configuration conf) {
return conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY,
CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT);
}
/**
* Class to construct instances of RPC server with specific options.

View File

@ -1067,6 +1067,35 @@ for ldap providers in the same way as above does.
</description>
</property>
<property>
<name>ipc.client.ping</name>
<value>true</value>
<description>Send a ping to the server when timeout on reading the response,
if set to true. If no failure is detected, the client retries until at least
a byte is read.
</description>
</property>
<property>
<name>ipc.ping.interval</name>
<value>60000</value>
<description>Timeout on waiting response from server, in milliseconds.
The client will send ping when the interval is passed without receiving bytes,
if ipc.client.ping is set to true.
</description>
</property>
<property>
<name>ipc.client.rpc-timeout.ms</name>
<value>0</value>
<description>Timeout on waiting response from server, in milliseconds.
Currently this timeout works only when ipc.client.ping is set to true
because it uses the same facilities with IPC ping.
The timeout overrides the ipc.ping.interval and client will throw exception
instead of sending ping when the interval is passed.
</description>
</property>
<property>
<name>ipc.server.listen.queue.size</name>
<value>128</value>

View File

@ -39,6 +39,7 @@ import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -1142,6 +1143,35 @@ public class TestRPC {
assertTrue("RetriableException not received", succeeded);
}
/**
* Test RPC timeout.
*/
@Test(timeout=30000)
public void testClientRpcTimeout() throws Exception {
final Server server = new RPC.Builder(conf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0)
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
.build();
server.start();
final Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
final TestProtocol proxy =
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
NetUtils.getConnectAddress(server), conf);
try {
proxy.sleep(3000);
fail("RPC should time out.");
} catch (SocketTimeoutException e) {
LOG.info("got expected timeout.", e);
} finally {
server.stop();
RPC.stopProxy(proxy);
}
}
public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf);

View File

@ -103,10 +103,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements
private static DatanodeProtocolPB createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf,
UserGroupInformation ugi) throws IOException {
return RPC.getProtocolProxy(DatanodeProtocolPB.class,
return RPC.getProxy(DatanodeProtocolPB.class,
RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class),
org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
}
@Override