HADOOP-11252. RPC client does not time out by default. Contributed by Wilfred Spiegelenburg and Masatake Iwasaki.
(cherry picked from commit64ae85fd2e
) (cherry picked from commitd98cfe611f
)
This commit is contained in:
parent
cdf9f1c662
commit
3a1c98bba3
|
@ -854,6 +854,9 @@ Release 2.6.4 - UNRELEASED
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
HADOOP-11252. RPC client does not time out by default.
|
||||||
|
(Wilfred Spiegelenburg and Masatake Iwasaki via aajisaka)
|
||||||
|
|
||||||
Release 2.6.3 - UNRELEASED
|
Release 2.6.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -52,6 +52,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||||
public static final String IPC_CLIENT_PING_KEY = "ipc.client.ping";
|
public static final String IPC_CLIENT_PING_KEY = "ipc.client.ping";
|
||||||
/** Default value of IPC_CLIENT_PING_KEY */
|
/** Default value of IPC_CLIENT_PING_KEY */
|
||||||
public static final boolean IPC_CLIENT_PING_DEFAULT = true;
|
public static final boolean IPC_CLIENT_PING_DEFAULT = true;
|
||||||
|
/** Timeout value for RPC client on waiting for response. */
|
||||||
|
public static final String IPC_CLIENT_RPC_TIMEOUT_KEY =
|
||||||
|
"ipc.client.rpc-timeout.ms";
|
||||||
|
/** Default value for IPC_CLIENT_RPC_TIMEOUT_KEY. */
|
||||||
|
public static final int IPC_CLIENT_RPC_TIMEOUT_DEFAULT = 0;
|
||||||
/** Responses larger than this will be logged */
|
/** Responses larger than this will be logged */
|
||||||
public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
|
public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
|
||||||
"ipc.server.max.response.size";
|
"ipc.server.max.response.size";
|
||||||
|
|
|
@ -212,7 +212,7 @@ public class Client {
|
||||||
* @param conf Configuration
|
* @param conf Configuration
|
||||||
* @param pingInterval the ping interval
|
* @param pingInterval the ping interval
|
||||||
*/
|
*/
|
||||||
final public static void setPingInterval(Configuration conf, int pingInterval) {
|
static final void setPingInterval(Configuration conf, int pingInterval) {
|
||||||
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
|
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ public class Client {
|
||||||
* @param conf Configuration
|
* @param conf Configuration
|
||||||
* @return the ping interval
|
* @return the ping interval
|
||||||
*/
|
*/
|
||||||
final public static int getPingInterval(Configuration conf) {
|
static final int getPingInterval(Configuration conf) {
|
||||||
return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
|
return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
|
||||||
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
|
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import javax.net.SocketFactory;
|
||||||
import org.apache.commons.logging.*;
|
import org.apache.commons.logging.*;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.io.*;
|
import org.apache.hadoop.io.*;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
|
@ -343,7 +344,8 @@ public class RPC {
|
||||||
long clientVersion,
|
long clientVersion,
|
||||||
InetSocketAddress addr, Configuration conf,
|
InetSocketAddress addr, Configuration conf,
|
||||||
long connTimeout) throws IOException {
|
long connTimeout) throws IOException {
|
||||||
return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout);
|
return waitForProtocolProxy(protocol, clientVersion, addr, conf,
|
||||||
|
getRpcTimeout(conf), null, connTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -487,8 +489,8 @@ public class RPC {
|
||||||
UserGroupInformation ticket,
|
UserGroupInformation ticket,
|
||||||
Configuration conf,
|
Configuration conf,
|
||||||
SocketFactory factory) throws IOException {
|
SocketFactory factory) throws IOException {
|
||||||
return getProtocolProxy(
|
return getProtocolProxy(protocol, clientVersion, addr, ticket, conf,
|
||||||
protocol, clientVersion, addr, ticket, conf, factory, 0, null);
|
factory, getRpcTimeout(conf), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -682,6 +684,17 @@ public class RPC {
|
||||||
+ "does not provide closeable invocation handler "
|
+ "does not provide closeable invocation handler "
|
||||||
+ proxy.getClass());
|
+ proxy.getClass());
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Get the RPC time from configuration;
|
||||||
|
* If not set in the configuration, return the default value.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return the RPC timeout (ms)
|
||||||
|
*/
|
||||||
|
public static int getRpcTimeout(Configuration conf) {
|
||||||
|
return conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY,
|
||||||
|
CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to construct instances of RPC server with specific options.
|
* Class to construct instances of RPC server with specific options.
|
||||||
|
|
|
@ -975,6 +975,35 @@ for ldap providers in the same way as above does.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>ipc.client.ping</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>Send a ping to the server when timeout on reading the response,
|
||||||
|
if set to true. If no failure is detected, the client retries until at least
|
||||||
|
a byte is read.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>ipc.ping.interval</name>
|
||||||
|
<value>60000</value>
|
||||||
|
<description>Timeout on waiting response from server, in milliseconds.
|
||||||
|
The client will send ping when the interval is passed without receiving bytes,
|
||||||
|
if ipc.client.ping is set to true.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>ipc.client.rpc-timeout.ms</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>Timeout on waiting response from server, in milliseconds.
|
||||||
|
Currently this timeout works only when ipc.client.ping is set to true
|
||||||
|
because it uses the same facilities with IPC ping.
|
||||||
|
The timeout overrides the ipc.ping.interval and client will throw exception
|
||||||
|
instead of sending ping when the interval is passed.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>ipc.server.listen.queue.size</name>
|
<name>ipc.server.listen.queue.size</name>
|
||||||
<value>128</value>
|
<value>128</value>
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.lang.reflect.Proxy;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -1077,6 +1078,35 @@ public class TestRPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test RPC timeout.
|
||||||
|
*/
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testClientRpcTimeout() throws Exception {
|
||||||
|
final Server server = new RPC.Builder(conf)
|
||||||
|
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||||
|
.setBindAddress(ADDRESS).setPort(0)
|
||||||
|
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||||
|
.build();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||||
|
final TestProtocol proxy =
|
||||||
|
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||||
|
NetUtils.getConnectAddress(server), conf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
proxy.sleep(3000);
|
||||||
|
fail("RPC should time out.");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
LOG.info("got expected timeout.", e);
|
||||||
|
} finally {
|
||||||
|
server.stop();
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
new TestRPC().testCallsInternal(conf);
|
new TestRPC().testCallsInternal(conf);
|
||||||
|
|
||||||
|
|
|
@ -103,10 +103,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
||||||
private static DatanodeProtocolPB createNamenode(
|
private static DatanodeProtocolPB createNamenode(
|
||||||
InetSocketAddress nameNodeAddr, Configuration conf,
|
InetSocketAddress nameNodeAddr, Configuration conf,
|
||||||
UserGroupInformation ugi) throws IOException {
|
UserGroupInformation ugi) throws IOException {
|
||||||
return RPC.getProtocolProxy(DatanodeProtocolPB.class,
|
return RPC.getProxy(DatanodeProtocolPB.class,
|
||||||
RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
|
RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
|
||||||
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class),
|
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
|
||||||
org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue