From 714e5f7165b101f72a43e8c3fb27be5def93fe42 Mon Sep 17 00:00:00 2001 From: Hairong Kuang Date: Thu, 5 Aug 2010 16:39:10 +0000 Subject: [PATCH] HADOOP-6889. Make RPC to have an option to timeout. Contributed by Hairong Kuang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@982681 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../org/apache/hadoop/ipc/AvroRpcEngine.java | 22 ++++--- src/java/org/apache/hadoop/ipc/Client.java | 45 +++++++++---- src/java/org/apache/hadoop/ipc/RPC.java | 64 +++++++++++++++---- src/java/org/apache/hadoop/ipc/RpcEngine.java | 6 +- .../apache/hadoop/ipc/WritableRpcEngine.java | 24 ++++--- .../core/org/apache/hadoop/ipc/TestIPC.java | 36 +++++++++-- 7 files changed, 148 insertions(+), 52 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index fdbbf1940d6..4930d870f08 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -33,6 +33,8 @@ Trunk (unreleased changes) HADOOP-6892. Common component of HDFS-1150 (Verify datanodes' identities to clients in secure clusters) (jghoman) + HADOOP-6889. Make RPC to have an option to timeout. (hairong) + IMPROVEMENTS HADOOP-6644. util.Shell getGROUPS_FOR_USER_COMMAND method name @@ -105,6 +107,7 @@ Trunk (unreleased changes) periodically. (Owen O'Malley and ddas via ddas) HADOOP-6890. Improve listFiles API introduced by HADOOP-6870. (hairong) + OPTIMIZATIONS BUG FIXES diff --git a/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java b/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java index 4ecfb8a19c9..7f6856bffa2 100644 --- a/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java +++ b/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java @@ -98,11 +98,13 @@ private static class ClientTransceiver extends Transceiver { public ClientTransceiver(InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) + Configuration conf, SocketFactory factory, + int rpcTimeout) throws IOException { this.tunnel = (TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION, - addr, ticket, conf, factory); + addr, ticket, conf, factory, + rpcTimeout); this.remote = addr; } @@ -128,14 +130,15 @@ public void close() throws IOException { /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public Object getProxy(Class protocol, long clientVersion, + public Object getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) + Configuration conf, SocketFactory factory, + int rpcTimeout) throws IOException { return Proxy.newProxyInstance (protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory)); + new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); } /** Stop this proxy. */ @@ -152,8 +155,9 @@ private static class Invoker implements InvocationHandler, Closeable { private final ReflectRequestor requestor; public Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory) throws IOException { - this.tx = new ClientTransceiver(addr, ticket, conf, factory); + SocketFactory factory, + int rpcTimeout) throws IOException { + this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout); this.requestor = new ReflectRequestor(protocol, tx); } @Override public Object invoke(Object proxy, Method method, Object[] args) @@ -169,7 +173,7 @@ public void close() throws IOException { private static class TunnelResponder extends ReflectResponder implements TunnelProtocol { - public TunnelResponder(Class iface, Object impl) { + public TunnelResponder(Class iface, Object impl) { super(iface, impl); } @@ -192,7 +196,7 @@ public Object[] call(Method method, Object[][] params, /** Construct a server for a protocol implementation instance listening on a * port and address. */ - public RPC.Server getServer(Class iface, Object impl, String bindAddress, + public RPC.Server getServer(Class iface, Object impl, String bindAddress, int port, int numHandlers, boolean verbose, Configuration conf, SecretManager secretManager diff --git a/src/java/org/apache/hadoop/ipc/Client.java b/src/java/org/apache/hadoop/ipc/Client.java index 87356017c09..f2da724dd1d 100644 --- a/src/java/org/apache/hadoop/ipc/Client.java +++ b/src/java/org/apache/hadoop/ipc/Client.java @@ -219,6 +219,7 @@ private class Connection extends Thread { private Socket socket = null; // connected socket private DataInputStream in; private DataOutputStream out; + private int rpcTimeout; // currently active calls private Hashtable calls = new Hashtable(); @@ -233,7 +234,7 @@ public Connection(ConnectionId remoteId) throws IOException { throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); } - + this.rpcTimeout = remoteId.getRpcTimeout(); UserGroupInformation ticket = remoteId.getTicket(); Class protocol = remoteId.getProtocol(); this.useSasl = UserGroupInformation.isSecurityEnabled(); @@ -321,11 +322,13 @@ protected PingInputStream(InputStream in) { } /* Process timeout exception - * if the connection is not going to be closed, send a ping. + * if the connection is not going to be closed or + * is not configured to have a RPC timeout, send a ping. + * (if rpcTimeout is not set to be 0, then RPC should timeout. * otherwise, throw the timeout exception. */ private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get()) { + if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) { throw e; } else { sendPing(); @@ -405,6 +408,9 @@ private synchronized void setupConnection() throws IOException { this.socket.setTcpNoDelay(tcpNoDelay); // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), 20000); + if (rpcTimeout > 0) { + pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval + } this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { @@ -952,7 +958,7 @@ public Writable call(Writable param, InetSocketAddress address) public Writable call(Writable param, InetSocketAddress addr, UserGroupInformation ticket) throws InterruptedException, IOException { - return call(param, addr, null, ticket); + return call(param, addr, null, ticket, 0); } /** Make a call, passing param, to the IPC server running at @@ -961,10 +967,12 @@ public Writable call(Writable param, InetSocketAddress addr, * Throws exceptions if there are network problems or if the remote code * threw an exception. */ public Writable call(Writable param, InetSocketAddress addr, - Class protocol, UserGroupInformation ticket) + Class protocol, UserGroupInformation ticket, + int rpcTimeout) throws InterruptedException, IOException { Call call = new Call(param); - Connection connection = getConnection(addr, protocol, ticket, call); + Connection connection = getConnection( + addr, protocol, ticket, rpcTimeout, call); connection.sendParam(call); // send the parameter boolean interrupted = false; synchronized (call) { @@ -1054,7 +1062,7 @@ public Writable[] call(Writable[] params, InetSocketAddress[] addresses, ParallelCall call = new ParallelCall(params[i], results, i); try { Connection connection = - getConnection(addresses[i], protocol, ticket, call); + getConnection(addresses[i], protocol, ticket, 0, call); connection.sendParam(call); // send each parameter } catch (IOException e) { // log errors @@ -1078,6 +1086,7 @@ public Writable[] call(Writable[] params, InetSocketAddress[] addresses, private Connection getConnection(InetSocketAddress addr, Class protocol, UserGroupInformation ticket, + int rpcTimeout, Call call) throws IOException, InterruptedException { if (!running.get()) { @@ -1089,7 +1098,8 @@ private Connection getConnection(InetSocketAddress addr, * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ - ConnectionId remoteId = new ConnectionId(addr, protocol, ticket); + ConnectionId remoteId = new ConnectionId( + addr, protocol, ticket, rpcTimeout); do { synchronized (connections) { connection = connections.get(remoteId); @@ -1117,12 +1127,14 @@ private static class ConnectionId { UserGroupInformation ticket; Class protocol; private static final int PRIME = 16777619; + private int rpcTimeout; ConnectionId(InetSocketAddress address, Class protocol, - UserGroupInformation ticket) { + UserGroupInformation ticket, int rpcTimeout) { this.protocol = protocol; this.address = address; this.ticket = ticket; + this.rpcTimeout = rpcTimeout; } InetSocketAddress getAddress() { @@ -1137,6 +1149,9 @@ UserGroupInformation getTicket() { return ticket; } + private int getRpcTimeout() { + return rpcTimeout; + } @Override public boolean equals(Object obj) { @@ -1144,15 +1159,19 @@ public boolean equals(Object obj) { ConnectionId id = (ConnectionId) obj; return address.equals(id.address) && protocol == id.protocol && ((ticket != null && ticket.equals(id.ticket)) || - (ticket == id.ticket)); + (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout; } return false; } - @Override + @Override // simply use the default Object#hashcode() ? public int hashCode() { - return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^ - (ticket == null ? 0 : ticket.hashCode()); + return (address.hashCode() + PRIME * ( + PRIME * ( + PRIME * System.identityHashCode(protocol) ^ + System.identityHashCode(ticket) + ) ^ System.identityHashCode(rpcTimeout) + )); } } } diff --git a/src/java/org/apache/hadoop/ipc/RPC.java b/src/java/org/apache/hadoop/ipc/RPC.java index 7471e1ea69f..8c7db90a660 100644 --- a/src/java/org/apache/hadoop/ipc/RPC.java +++ b/src/java/org/apache/hadoop/ipc/RPC.java @@ -156,7 +156,7 @@ public long getServerVersion() { } public static Object waitForProxy( - Class protocol, + Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf @@ -170,18 +170,37 @@ public static Object waitForProxy( * @param clientVersion client version * @param addr remote address * @param conf configuration to use - * @param timeout time in milliseconds before giving up + * @param connTimeout time in milliseconds before giving up * @return the proxy * @throws IOException if the far end through a RemoteException */ - public static Object waitForProxy(Class protocol, long clientVersion, + public static Object waitForProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, - long timeout) throws IOException { + long connTimeout) throws IOException { + return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout); + } + /** + * Get a proxy connection to a remote server + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param conf configuration to use + * @param rpcTimeout timeout for each RPC + * @param timeout time in milliseconds before giving up + * @return the proxy + * @throws IOException if the far end through a RemoteException + */ + public static Object waitForProxy(Class protocol, long clientVersion, + InetSocketAddress addr, Configuration conf, + int rpcTimeout, + long timeout) throws IOException { long startTime = System.currentTimeMillis(); IOException ioe; while (true) { try { - return getProxy(protocol, clientVersion, addr, conf); + return getProxy(protocol, clientVersion, addr, + UserGroupInformation.getCurrentUser(), conf, NetUtils + .getDefaultSocketFactory(conf), rpcTimeout); } catch(ConnectException se) { // namenode has not been started LOG.info("Server at " + addr + " not available yet, Zzzzz..."); ioe = se; @@ -208,7 +227,7 @@ public static Object waitForProxy(Class protocol, long clientVersion, /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static Object getProxy(Class protocol, long clientVersion, + public static Object getProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, SocketFactory factory) throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); @@ -217,16 +236,39 @@ public static Object getProxy(Class protocol, long clientVersion, /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static Object getProxy(Class protocol, long clientVersion, + public static Object getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory) throws IOException { + SocketFactory factory) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0); + } + + /** + * Construct a client-side proxy that implements the named protocol, + * talking to a server at the named address. + * + * @param protocol protocol + * @param clientVersion client's version + * @param addr server address + * @param ticket security ticket + * @param conf configuration + * @param factory socket factory + * @param rpcTimeout max time for each rpc; 0 means no timeout + * @return the proxy + * @throws IOException if any error occurs + */ + public static Object getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, + SocketFactory factory, + int rpcTimeout) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } - return getProtocolEngine(protocol,conf) - .getProxy(protocol, clientVersion, addr, ticket, conf, factory); + return getProtocolEngine(protocol,conf).getProxy(protocol, + clientVersion, addr, ticket, conf, factory, rpcTimeout); } /** @@ -239,7 +281,7 @@ public static Object getProxy(Class protocol, long clientVersion, * @return a proxy instance * @throws IOException */ - public static Object getProxy(Class protocol, long clientVersion, + public static Object getProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { diff --git a/src/java/org/apache/hadoop/ipc/RpcEngine.java b/src/java/org/apache/hadoop/ipc/RpcEngine.java index 0bb87a20eaa..dd7a4f35241 100644 --- a/src/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/src/java/org/apache/hadoop/ipc/RpcEngine.java @@ -32,10 +32,10 @@ interface RpcEngine { /** Construct a client-side proxy object. */ - Object getProxy(Class protocol, + Object getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory) throws IOException; + SocketFactory factory, int rpcTimeout) throws IOException; /** Stop this proxy. */ void stopProxy(Object proxy); @@ -46,7 +46,7 @@ Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, throws IOException, InterruptedException; /** Construct a server for a protocol implementation instance. */ - RPC.Server getServer(Class protocol, Object instance, String bindAddress, + RPC.Server getServer(Class protocol, Object instance, String bindAddress, int port, int numHandlers, boolean verbose, Configuration conf, SecretManager secretManager diff --git a/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 518625e604c..030dc8ee08f 100644 --- a/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -48,7 +48,7 @@ class WritableRpcEngine implements RpcEngine { /** A method invocation, including the method name and its parameters.*/ private static class Invocation implements Writable, Configurable { private String methodName; - private Class[] parameterClasses; + private Class[] parameterClasses; private Object[] parameters; private Configuration conf; @@ -64,7 +64,7 @@ public Invocation(Method method, Object[] parameters) { public String getMethodName() { return methodName; } /** The parameter classes. */ - public Class[] getParameterClasses() { return parameterClasses; } + public Class[] getParameterClasses() { return parameterClasses; } /** The parameter instances. */ public Object[] getParameters() { return parameters; } @@ -172,18 +172,21 @@ private void stopClient(Client client) { private static ClientCache CLIENTS=new ClientCache(); private static class Invoker implements InvocationHandler { - private Class protocol; + private Class protocol; private InetSocketAddress address; private UserGroupInformation ticket; + private int rpcTimeout; private Client client; private boolean isClosed = false; - public Invoker(Class protocol, + public Invoker(Class protocol, InetSocketAddress address, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) { + Configuration conf, SocketFactory factory, + int rpcTimeout) { this.protocol = protocol; this.address = address; this.ticket = ticket; + this.rpcTimeout = rpcTimeout; this.client = CLIENTS.getClient(conf, factory); } @@ -197,7 +200,7 @@ public Object invoke(Object proxy, Method method, Object[] args) ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), address, - protocol, ticket); + protocol, ticket, rpcTimeout); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -216,14 +219,15 @@ synchronized private void close() { /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public Object getProxy(Class protocol, long clientVersion, + public Object getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) + Configuration conf, SocketFactory factory, + int rpcTimeout) throws IOException { Object proxy = Proxy.newProxyInstance (protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory)); + new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); if (proxy instanceof VersionedProtocol) { long serverVersion = ((VersionedProtocol)proxy) .getProtocolVersion(protocol.getName(), clientVersion); @@ -276,7 +280,7 @@ public Object[] call(Method method, Object[][] params, /** Construct a server for a protocol implementation instance listening on a * port and address. */ - public Server getServer(Class protocol, + public Server getServer(Class protocol, Object instance, String bindAddress, int port, int numHandlers, boolean verbose, Configuration conf, SecretManager secretManager) diff --git a/src/test/core/org/apache/hadoop/ipc/TestIPC.java b/src/test/core/org/apache/hadoop/ipc/TestIPC.java index f9b48e6a70d..4ef3f204a04 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestIPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestIPC.java @@ -29,6 +29,7 @@ import java.io.DataInput; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import javax.net.SocketFactory; import junit.framework.TestCase; @@ -43,6 +44,7 @@ public class TestIPC extends TestCase { final private static Configuration conf = new Configuration(); final static private int PING_INTERVAL = 1000; + final static private int MIN_SLEEP_TIME = 1000; static { Client.setPingInterval(conf, PING_INTERVAL); @@ -66,8 +68,9 @@ public TestServer(int handlerCount, boolean sleep) public Writable call(Class protocol, Writable param, long receiveTime) throws IOException { if (sleep) { + // sleep a bit try { - Thread.sleep(RANDOM.nextInt(2*PING_INTERVAL)); // sleep a bit + Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME); } catch (InterruptedException e) {} } return param; // echo param as result @@ -91,7 +94,7 @@ public void run() { try { LongWritable param = new LongWritable(RANDOM.nextLong()); LongWritable value = - (LongWritable)client.call(param, server, null, null); + (LongWritable)client.call(param, server, null, null, 0); if (!param.equals(value)) { LOG.fatal("Call failed!"); failed = true; @@ -142,6 +145,7 @@ public void run() { public void testSerial() throws Exception { testSerial(3, false, 2, 5, 100); + testSerial(3, true, 2, 5, 10); } public void testSerial(int handlerCount, boolean handlerSleep, @@ -219,7 +223,7 @@ public void testStandAloneClient() throws Exception { InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); try { client.call(new LongWritable(RANDOM.nextLong()), - address, null, null); + address, null, null, 0); fail("Expected an exception to have been thrown"); } catch (IOException e) { String message = e.getMessage(); @@ -276,7 +280,7 @@ public void testErrorClient() throws Exception { Client client = new Client(LongErrorWritable.class, conf); try { client.call(new LongErrorWritable(RANDOM.nextLong()), - addr, null, null); + addr, null, null, 0); fail("Expected an exception to have been thrown"); } catch (IOException e) { // check error @@ -296,7 +300,7 @@ public void testRuntimeExceptionWritable() throws Exception { Client client = new Client(LongRTEWritable.class, conf); try { client.call(new LongRTEWritable(RANDOM.nextLong()), - addr, null, null); + addr, null, null, 0); fail("Expected an exception to have been thrown"); } catch (IOException e) { // check error @@ -322,14 +326,34 @@ public void testSocketFactoryException() throws Exception { InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); try { client.call(new LongWritable(RANDOM.nextLong()), - address, null, null); + address, null, null, 0); fail("Expected an exception to have been thrown"); } catch (IOException e) { assertTrue(e.getMessage().contains("Injected fault")); } } + public void testIpcTimeout() throws Exception { + // start server + Server server = new TestServer(1, true); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + // start client + Client client = new Client(LongWritable.class, conf); + // set timeout to be less than MIN_SLEEP_TIME + try { + client.call(new LongWritable(RANDOM.nextLong()), + addr, null, null, MIN_SLEEP_TIME/2); + fail("Expected an exception to have been thrown"); + } catch (SocketTimeoutException e) { + LOG.info("Get a SocketTimeoutException ", e); + } + // set timeout to be bigger than 3*ping interval + client.call(new LongWritable(RANDOM.nextLong()), + addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME); + } + public static void main(String[] args) throws Exception { //new TestIPC("test").testSerial(5, false, 2, 10, 1000);