From 3a1c98bba35899c50d17a44cdcd78db295b1e7f6 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Mon, 4 Jan 2016 14:31:22 +0900 Subject: [PATCH] HADOOP-11252. RPC client does not time out by default. Contributed by Wilfred Spiegelenburg and Masatake Iwasaki. (cherry picked from commit 64ae85fd2ea91f46ab3b21f007befbeef8c3c947) (cherry picked from commit d98cfe611f7cedc58cdecfb5a83829adb2521138) --- .../hadoop-common/CHANGES.txt | 3 ++ .../hadoop/fs/CommonConfigurationKeys.java | 5 ++++ .../java/org/apache/hadoop/ipc/Client.java | 4 +-- .../main/java/org/apache/hadoop/ipc/RPC.java | 19 ++++++++++-- .../src/main/resources/core-default.xml | 29 ++++++++++++++++++ .../java/org/apache/hadoop/ipc/TestRPC.java | 30 +++++++++++++++++++ ...atanodeProtocolClientSideTranslatorPB.java | 5 ++-- 7 files changed, 87 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 7f048641ca8..01193f8df79 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -854,6 +854,9 @@ Release 2.6.4 - UNRELEASED BUG FIXES + HADOOP-11252. RPC client does not time out by default. + (Wilfred Spiegelenburg and Masatake Iwasaki via aajisaka) + Release 2.6.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 757549600bd..b0b602bf9e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -52,6 +52,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String IPC_CLIENT_PING_KEY = "ipc.client.ping"; /** Default value of IPC_CLIENT_PING_KEY */ public static final boolean IPC_CLIENT_PING_DEFAULT = true; + /** Timeout value for RPC client on waiting for response. */ + public static final String IPC_CLIENT_RPC_TIMEOUT_KEY = + "ipc.client.rpc-timeout.ms"; + /** Default value for IPC_CLIENT_RPC_TIMEOUT_KEY. */ + public static final int IPC_CLIENT_RPC_TIMEOUT_DEFAULT = 0; /** Responses larger than this will be logged */ public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 99b0b626fa4..42b99345bd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -212,7 +212,7 @@ public class Client { * @param conf Configuration * @param pingInterval the ping interval */ - final public static void setPingInterval(Configuration conf, int pingInterval) { + static final void setPingInterval(Configuration conf, int pingInterval) { conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval); } @@ -223,7 +223,7 @@ public class Client { * @param conf Configuration * @return the ping interval */ - final public static int getPingInterval(Configuration conf) { + static final int getPingInterval(Configuration conf) { return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 797b719c8f2..297378109e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -40,6 +40,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.*; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; @@ -343,7 +344,8 @@ public class RPC { long clientVersion, InetSocketAddress addr, Configuration conf, long connTimeout) throws IOException { - return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout); + return waitForProtocolProxy(protocol, clientVersion, addr, conf, + getRpcTimeout(conf), null, connTimeout); } /** @@ -487,8 +489,8 @@ public class RPC { UserGroupInformation ticket, Configuration conf, SocketFactory factory) throws IOException { - return getProtocolProxy( - protocol, clientVersion, addr, ticket, conf, factory, 0, null); + return getProtocolProxy(protocol, clientVersion, addr, ticket, conf, + factory, getRpcTimeout(conf), null); } /** @@ -682,6 +684,17 @@ public class RPC { + "does not provide closeable invocation handler " + proxy.getClass()); } + /** + * Get the RPC time from configuration; + * If not set in the configuration, return the default value. + * + * @param conf Configuration + * @return the RPC timeout (ms) + */ + public static int getRpcTimeout(Configuration conf) { + return conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, + CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT); + } /** * Class to construct instances of RPC server with specific options. diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4ca15b93b5d..96bac4d4aff 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -975,6 +975,35 @@ for ldap providers in the same way as above does. + + ipc.client.ping + true + Send a ping to the server when timeout on reading the response, + if set to true. If no failure is detected, the client retries until at least + a byte is read. + + + + + ipc.ping.interval + 60000 + Timeout on waiting response from server, in milliseconds. + The client will send ping when the interval is passed without receiving bytes, + if ipc.client.ping is set to true. + + + + + ipc.client.rpc-timeout.ms + 0 + Timeout on waiting response from server, in milliseconds. + Currently this timeout works only when ipc.client.ping is set to true + because it uses the same facilities with IPC ping. + The timeout overrides the ipc.ping.interval and client will throw exception + instead of sending ping when the interval is passed. + + + ipc.server.listen.queue.size 128 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 8a4dcb67bb9..b3e0553190d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -38,6 +38,7 @@ import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -1077,6 +1078,35 @@ public class TestRPC { } } + /** + * Test RPC timeout. + */ + @Test(timeout=30000) + public void testClientRpcTimeout() throws Exception { + final Server server = new RPC.Builder(conf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) + .build(); + server.start(); + + final Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); + final TestProtocol proxy = + RPC.getProxy(TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + + try { + proxy.sleep(3000); + fail("RPC should time out."); + } catch (SocketTimeoutException e) { + LOG.info("got expected timeout.", e); + } finally { + server.stop(); + RPC.stopProxy(proxy); + } + } + public static void main(String[] args) throws IOException { new TestRPC().testCallsInternal(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 86422e6cb5d..054d469d8de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -103,10 +103,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements private static DatanodeProtocolPB createNamenode( InetSocketAddress nameNodeAddr, Configuration conf, UserGroupInformation ugi) throws IOException { - return RPC.getProtocolProxy(DatanodeProtocolPB.class, + return RPC.getProxy(DatanodeProtocolPB.class, RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi, - conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class), - org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy(); + conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class)); } @Override