From 7fb55be2d3dd3da4b1da6924cc8715f7291219df Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sat, 7 Jan 2012 16:57:58 +0000 Subject: [PATCH] svn merge -c 1167318 from trunk for HADOOP-7607. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1228665 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 6 ++- .../org/apache/hadoop/ipc/AvroRpcEngine.java | 11 +--- .../main/java/org/apache/hadoop/ipc/RPC.java | 54 ++++++++++--------- .../java/org/apache/hadoop/ipc/RpcEngine.java | 3 -- .../apache/hadoop/ipc/WritableRpcEngine.java | 14 ++--- 5 files changed, 39 insertions(+), 49 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c37b2848914..8773371c137 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -2,7 +2,11 @@ Hadoop Change Log Release 0.23-PB - Unreleased - HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) + IMPROVEMENTS + + HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) + + HADOOP-7607. Simplify the RPC proxy cleanup process. (atm) Release 0.23.1 - Unreleased diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java index 5ab379c1da5..1b73351bf16 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java @@ -131,7 +131,7 @@ public class AvroRpcEngine implements RpcEngine { } public void close() throws IOException { - ENGINE.stopProxy(tunnel); + RPC.stopProxy(tunnel); } } @@ -152,15 +152,6 @@ public class AvroRpcEngine implements RpcEngine { false); } - /** Stop this proxy. */ - public void stopProxy(Object proxy) { - try { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } catch (IOException e) { - LOG.warn("Error while stopping "+proxy, e); - } - } - private class Invoker implements InvocationHandler, Closeable { private final ClientTransceiver tx; private final SpecificRequestor requestor; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 453a5dd1750..beb8cc1cfa1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.lang.reflect.Method; @@ -26,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; import java.io.*; +import java.io.Closeable; import java.util.Map; import java.util.HashMap; @@ -80,12 +82,8 @@ public class RPC { private RPC() {} // no public ctor // cache of RpcEngines by protocol - private static final Map PROTOCOL_ENGINES - = new HashMap(); - - // track what RpcEngine is used by a proxy class, for stopProxy() - private static final Map PROXY_ENGINES - = new HashMap(); + private static final Map,RpcEngine> PROTOCOL_ENGINES + = new HashMap,RpcEngine>(); private static final String ENGINE_PROP = "rpc.engine"; @@ -96,32 +94,23 @@ public class RPC { * @param engine the RpcEngine impl */ public static void setProtocolEngine(Configuration conf, - Class protocol, Class engine) { + Class protocol, Class engine) { conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class); } // return the RpcEngine configured to handle a protocol - private static synchronized RpcEngine getProtocolEngine(Class protocol, + private static synchronized RpcEngine getProtocolEngine(Class protocol, Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); - if (protocol.isInterface()) - PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), - protocol), - engine); PROTOCOL_ENGINES.put(protocol, engine); } return engine; } - // return the RpcEngine that handles a proxy object - private static synchronized RpcEngine getProxyEngine(Object proxy) { - return PROXY_ENGINES.get(proxy.getClass()); - } - /** * A version mismatch for the RPC protocol. */ @@ -477,13 +466,30 @@ public class RPC { } /** - * Stop this proxy and release its invoker's resource - * @param proxy the proxy to be stopped + * Stop this proxy and release its invoker's resource by getting the + * invocation handler for the given proxy object and calling + * {@link Closeable#close} if that invocation handler implements + * {@link Closeable}. + * + * @param proxy the RPC proxy object to be stopped */ public static void stopProxy(Object proxy) { - RpcEngine rpcEngine; - if (proxy!=null && (rpcEngine = getProxyEngine(proxy)) != null) { - rpcEngine.stopProxy(proxy); + InvocationHandler invocationHandler = null; + try { + invocationHandler = Proxy.getInvocationHandler(proxy); + } catch (IllegalArgumentException e) { + LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e); + } + if (proxy != null && invocationHandler != null && + invocationHandler instanceof Closeable) { + try { + ((Closeable)invocationHandler).close(); + } catch (IOException e) { + LOG.error("Stopping RPC invocation handler caused exception", e); + } + } else { + LOG.error("Could not get invocation handler " + invocationHandler + + " for proxy " + proxy + ", or invocation handler is not closeable."); } } @@ -532,7 +538,7 @@ public class RPC { } /** Construct a server for a protocol implementation instance. */ - public static Server getServer(Class protocol, + public static Server getServer(Class protocol, Object instance, String bindAddress, int port, Configuration conf) throws IOException { @@ -543,7 +549,7 @@ public class RPC { * @deprecated secretManager should be passed. */ @Deprecated - public static Server getServer(Class protocol, + public static Server getServer(Class protocol, Object instance, String bindAddress, int port, int numHandlers, boolean verbose, Configuration conf) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 500cd9537c6..a9076e7d1e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -41,9 +41,6 @@ public interface RpcEngine { UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException; - /** Stop this proxy. */ - void stopProxy(Object proxy); - /** Expert: Make multiple, parallel calls to a set of servers. */ Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index b28949d99a4..75b8d51f057 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -30,6 +30,7 @@ import java.io.*; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.io.Closeable; import java.util.Map; import java.util.HashMap; @@ -219,7 +220,7 @@ public class WritableRpcEngine implements RpcEngine { private static ClientCache CLIENTS=new ClientCache(); - private static class Invoker implements InvocationHandler { + private static class Invoker implements InvocationHandler, Closeable { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; @@ -250,7 +251,7 @@ public class WritableRpcEngine implements RpcEngine { } /* close the IPC client that's responsible for this invoker's RPCs */ - synchronized private void close() { + synchronized public void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); @@ -281,15 +282,6 @@ public class WritableRpcEngine implements RpcEngine { factory, rpcTimeout)); return new ProtocolProxy(protocol, proxy, true); } - - /** - * Stop this proxy and release its invoker's resource - * @param proxy the proxy to be stopped - */ - public void stopProxy(Object proxy) { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } - /** Expert: Make multiple, parallel calls to a set of servers. */ public Object[] call(Method method, Object[][] params,