diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 11523b3bfa5..0812c05fe1a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -5,7 +5,11 @@ Trunk (unreleased changes) IMPROVEMENTS HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm) - HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) + + HADOOP-7524. Change RPC to allow multiple protocols including multuple + versions of the same protocol (sanjay Radia) + + HADOOP-7607. Simplify the RPC proxy cleanup process. (atm) BUGS diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java index 5ab379c1da5..1b73351bf16 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java @@ -131,7 +131,7 @@ public class AvroRpcEngine implements RpcEngine { } public void close() throws IOException { - ENGINE.stopProxy(tunnel); + RPC.stopProxy(tunnel); } } @@ -152,15 +152,6 @@ public class AvroRpcEngine implements RpcEngine { false); } - /** Stop this proxy. */ - public void stopProxy(Object proxy) { - try { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } catch (IOException e) { - LOG.warn("Error while stopping "+proxy, e); - } - } - private class Invoker implements InvocationHandler, Closeable { private final ClientTransceiver tx; private final SpecificRequestor requestor; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 453a5dd1750..beb8cc1cfa1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.lang.reflect.Method; @@ -26,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; import java.io.*; +import java.io.Closeable; import java.util.Map; import java.util.HashMap; @@ -80,12 +82,8 @@ public class RPC { private RPC() {} // no public ctor // cache of RpcEngines by protocol - private static final Map PROTOCOL_ENGINES - = new HashMap(); - - // track what RpcEngine is used by a proxy class, for stopProxy() - private static final Map PROXY_ENGINES - = new HashMap(); + private static final Map,RpcEngine> PROTOCOL_ENGINES + = new HashMap,RpcEngine>(); private static final String ENGINE_PROP = "rpc.engine"; @@ -96,32 +94,23 @@ public class RPC { * @param engine the RpcEngine impl */ public static void setProtocolEngine(Configuration conf, - Class protocol, Class engine) { + Class protocol, Class engine) { conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class); } // return the RpcEngine configured to handle a protocol - private static synchronized RpcEngine getProtocolEngine(Class protocol, + private static synchronized RpcEngine getProtocolEngine(Class protocol, Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); - if (protocol.isInterface()) - PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), - protocol), - engine); PROTOCOL_ENGINES.put(protocol, engine); } return engine; } - // return the RpcEngine that handles a proxy object - private static synchronized RpcEngine getProxyEngine(Object proxy) { - return PROXY_ENGINES.get(proxy.getClass()); - } - /** * A version mismatch for the RPC protocol. */ @@ -477,13 +466,30 @@ public class RPC { } /** - * Stop this proxy and release its invoker's resource - * @param proxy the proxy to be stopped + * Stop this proxy and release its invoker's resource by getting the + * invocation handler for the given proxy object and calling + * {@link Closeable#close} if that invocation handler implements + * {@link Closeable}. + * + * @param proxy the RPC proxy object to be stopped */ public static void stopProxy(Object proxy) { - RpcEngine rpcEngine; - if (proxy!=null && (rpcEngine = getProxyEngine(proxy)) != null) { - rpcEngine.stopProxy(proxy); + InvocationHandler invocationHandler = null; + try { + invocationHandler = Proxy.getInvocationHandler(proxy); + } catch (IllegalArgumentException e) { + LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e); + } + if (proxy != null && invocationHandler != null && + invocationHandler instanceof Closeable) { + try { + ((Closeable)invocationHandler).close(); + } catch (IOException e) { + LOG.error("Stopping RPC invocation handler caused exception", e); + } + } else { + LOG.error("Could not get invocation handler " + invocationHandler + + " for proxy " + proxy + ", or invocation handler is not closeable."); } } @@ -532,7 +538,7 @@ public class RPC { } /** Construct a server for a protocol implementation instance. */ - public static Server getServer(Class protocol, + public static Server getServer(Class protocol, Object instance, String bindAddress, int port, Configuration conf) throws IOException { @@ -543,7 +549,7 @@ public class RPC { * @deprecated secretManager should be passed. */ @Deprecated - public static Server getServer(Class protocol, + public static Server getServer(Class protocol, Object instance, String bindAddress, int port, int numHandlers, boolean verbose, Configuration conf) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index 500cd9537c6..a9076e7d1e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -41,9 +41,6 @@ public interface RpcEngine { UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException; - /** Stop this proxy. */ - void stopProxy(Object proxy); - /** Expert: Make multiple, parallel calls to a set of servers. */ Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index b28949d99a4..75b8d51f057 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -30,6 +30,7 @@ import java.io.*; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.io.Closeable; import java.util.Map; import java.util.HashMap; @@ -219,7 +220,7 @@ public class WritableRpcEngine implements RpcEngine { private static ClientCache CLIENTS=new ClientCache(); - private static class Invoker implements InvocationHandler { + private static class Invoker implements InvocationHandler, Closeable { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; @@ -250,7 +251,7 @@ public class WritableRpcEngine implements RpcEngine { } /* close the IPC client that's responsible for this invoker's RPCs */ - synchronized private void close() { + synchronized public void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); @@ -281,15 +282,6 @@ public class WritableRpcEngine implements RpcEngine { factory, rpcTimeout)); return new ProtocolProxy(protocol, proxy, true); } - - /** - * Stop this proxy and release its invoker's resource - * @param proxy the proxy to be stopped - */ - public void stopProxy(Object proxy) { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } - /** Expert: Make multiple, parallel calls to a set of servers. */ public Object[] call(Method method, Object[][] params, diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fa13a4bd36b..2c880aa152e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -4,7 +4,11 @@ Trunk (unreleased changes) IMPROVEMENTS - MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) + MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols + including multuple versions of the same protocol (sanjay Radia) + + MAPREDUCE-2934. MR portion of HADOOP-7607 - Simplify the RPC proxy cleanup + process (atm) BUG FIXES MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java index b6f96597e4a..9a623a1a8a7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java @@ -73,15 +73,6 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine { addr, ticket, conf, factory, rpcTimeout)), false); } - @Override - public void stopProxy(Object proxy) { - try { - ((Invoker) Proxy.getInvocationHandler(proxy)).close(); - } catch (IOException e) { - LOG.warn("Error while stopping " + proxy, e); - } - } - private static class Invoker implements InvocationHandler, Closeable { private Map returnTypes = new ConcurrentHashMap(); private boolean isClosed = false;