diff --git a/CHANGES.txt b/CHANGES.txt index 0fb89cd2570..8b40a91c79a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,6 +49,9 @@ Trunk (unreleased changes) HADOOP-6346. Add support for specifying unpack pattern regex to RunJar.unJar. (Todd Lipcon via tomwhite) + HADOOP-6422. Make RPC backend plugable, protocol-by-protocol, to + ease evolution towards Avro. (cutting) + OPTIMIZATIONS BUG FIXES diff --git a/src/java/org/apache/hadoop/ipc/AvroRpc.java b/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java similarity index 66% rename from src/java/org/apache/hadoop/ipc/AvroRpc.java rename to src/java/org/apache/hadoop/ipc/AvroRpcEngine.java index 80901de0b04..3f23f6f4d68 100644 --- a/src/java/org/apache/hadoop/ipc/AvroRpc.java +++ b/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java @@ -28,6 +28,8 @@ import javax.net.SocketFactory; import javax.security.auth.login.LoginException; +import org.apache.commons.logging.*; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.security.UserGroupInformation; @@ -41,9 +43,14 @@ * does not give cross-language wire compatibility, since the Hadoop RPC wire * format is non-standard, but it does permit use of Avro's protocol versioning * features for inter-Java RPCs. */ -public class AvroRpc { +class AvroRpcEngine implements RpcEngine { + private static final Log LOG = LogFactory.getLog(RPC.class); + private static int VERSION = 0; + // the implementation we tunnel through + private static final RpcEngine ENGINE = new WritableRpcEngine(); + /** Tunnel an Avro RPC request and response through Hadoop's RPC. */ private static interface TunnelProtocol extends VersionedProtocol { /** All Avro methods and responses go through this. */ @@ -91,8 +98,9 @@ public ClientTransceiver(InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory) throws IOException { - this.tunnel = (TunnelProtocol)RPC.getProxy(TunnelProtocol.class, VERSION, - addr, ticket, conf, factory); + this.tunnel = + (TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION, + addr, ticket, conf, factory); this.remote = addr; } @@ -111,44 +119,48 @@ public void writeBuffers(List buffers) throws IOException { throw new UnsupportedOperationException(); } - public void close() throws IOException {} - } - - /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. */ - public static Object getProxy(Class protocol, - InetSocketAddress addr, - Configuration conf) - throws IOException { - UserGroupInformation ugi = null; - try { - ugi = UserGroupInformation.login(conf); - } catch (LoginException le) { - throw new RuntimeException("Couldn't login!"); + public void close() throws IOException { + ENGINE.stopProxy(tunnel); } - return getProxy(protocol, addr, ugi, conf, - NetUtils.getDefaultSocketFactory(conf)); } /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static Object getProxy - (final Class protocol, final InetSocketAddress addr, - final UserGroupInformation ticket, - final Configuration conf, final SocketFactory factory) + public Object getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, + Configuration conf, SocketFactory factory) throws IOException { - return Proxy.newProxyInstance - (protocol.getClassLoader(), new Class[] { protocol }, - new InvocationHandler() { - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - return new ReflectRequestor - (protocol, - new ClientTransceiver(addr, ticket, conf, factory)) - .invoke(proxy, method, args); - } - }); + (protocol.getClassLoader(), + new Class[] { protocol }, + new Invoker(protocol, addr, ticket, conf, factory)); + } + + /** Stop this proxy. */ + public void stopProxy(Object proxy) { + try { + ((Invoker)Proxy.getInvocationHandler(proxy)).close(); + } catch (IOException e) { + LOG.warn("Error while stopping "+proxy, e); + } + } + + private static class Invoker implements InvocationHandler, Closeable { + private final ClientTransceiver tx; + private final ReflectRequestor requestor; + public Invoker(Class protocol, InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, + SocketFactory factory) throws IOException { + this.tx = new ClientTransceiver(addr, ticket, conf, factory); + this.requestor = new ReflectRequestor(protocol, tx); + } + @Override public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return requestor.invoke(proxy, method, args); + } + public void close() throws IOException { + tx.close(); + } } /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */ @@ -170,24 +182,20 @@ public BufferListWritable call(final BufferListWritable request) } } - /** Construct a server for a protocol implementation instance listening on a - * port and address. */ - public static Server getServer(Object impl, String bindAddress, int port, - Configuration conf) - throws IOException { - return RPC.getServer(new TunnelResponder(impl.getClass(), impl), - bindAddress, port, conf); - + public Object[] call(Method method, Object[][] params, + InetSocketAddress[] addrs, UserGroupInformation ticket, + Configuration conf) throws IOException { + throw new UnsupportedOperationException(); } /** Construct a server for a protocol implementation instance listening on a * port and address. */ - public static RPC.Server getServer(Object impl, String bindAddress, int port, - int numHandlers, boolean verbose, - Configuration conf) - throws IOException { - return RPC.getServer(new TunnelResponder(impl.getClass(), impl), - bindAddress, port, numHandlers, verbose, conf); + public RPC.Server getServer(Class iface, Object impl, String bindAddress, + int port, int numHandlers, boolean verbose, + Configuration conf) throws IOException { + return ENGINE.getServer(TunnelProtocol.class, + new TunnelResponder(iface, impl), + bindAddress, port, numHandlers, verbose, conf); } } diff --git a/src/java/org/apache/hadoop/ipc/RPC.java b/src/java/org/apache/hadoop/ipc/RPC.java index ad2a03fef5e..c5e98ec66a6 100644 --- a/src/java/org/apache/hadoop/ipc/RPC.java +++ b/src/java/org/apache/hadoop/ipc/RPC.java @@ -20,9 +20,6 @@ import java.lang.reflect.Proxy; import java.lang.reflect.Method; -import java.lang.reflect.Array; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -32,7 +29,6 @@ import java.util.HashMap; import javax.net.SocketFactory; -import javax.security.auth.Subject; import javax.security.auth.login.LoginException; import org.apache.commons.logging.*; @@ -44,6 +40,7 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.conf.*; import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; +import org.apache.hadoop.util.ReflectionUtils; /** A simple RPC mechanism. * @@ -64,185 +61,55 @@ * the protocol instance is transmitted. */ public class RPC { - private static final Log LOG = - LogFactory.getLog(RPC.class); + private static final Log LOG = LogFactory.getLog(RPC.class); private RPC() {} // no public ctor + // cache of RpcEngines by protocol + private static final Map PROTOCOL_ENGINES + = new HashMap(); - /** A method invocation, including the method name and its parameters.*/ - private static class Invocation implements Writable, Configurable { - private String methodName; - private Class[] parameterClasses; - private Object[] parameters; - private Configuration conf; + // track what RpcEngine is used by a proxy class, for stopProxy() + private static final Map PROXY_ENGINES + = new HashMap(); - public Invocation() {} - - public Invocation(Method method, Object[] parameters) { - this.methodName = method.getName(); - this.parameterClasses = method.getParameterTypes(); - this.parameters = parameters; - } - - /** The name of the method invoked. */ - public String getMethodName() { return methodName; } - - /** The parameter classes. */ - public Class[] getParameterClasses() { return parameterClasses; } - - /** The parameter instances. */ - public Object[] getParameters() { return parameters; } - - public void readFields(DataInput in) throws IOException { - methodName = UTF8.readString(in); - parameters = new Object[in.readInt()]; - parameterClasses = new Class[parameters.length]; - ObjectWritable objectWritable = new ObjectWritable(); - for (int i = 0; i < parameters.length; i++) { - parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf); - parameterClasses[i] = objectWritable.getDeclaredClass(); - } - } - - public void write(DataOutput out) throws IOException { - UTF8.writeString(out, methodName); - out.writeInt(parameterClasses.length); - for (int i = 0; i < parameterClasses.length; i++) { - ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], - conf); - } - } - - public String toString() { - StringBuffer buffer = new StringBuffer(); - buffer.append(methodName); - buffer.append("("); - for (int i = 0; i < parameters.length; i++) { - if (i != 0) - buffer.append(", "); - buffer.append(parameters[i]); - } - buffer.append(")"); - return buffer.toString(); - } - - public void setConf(Configuration conf) { - this.conf = conf; - } - - public Configuration getConf() { - return this.conf; - } + private static final String ENGINE_PROP = "rpc.engine"; + // set a protocol to use a non-default RpcEngine + static void setProtocolEngine(Configuration conf, + Class protocol, Class engine) { + conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class); } - /* Cache a client using its socket factory as the hash key */ - static private class ClientCache { - private Map clients = - new HashMap(); - - /** - * Construct & cache an IPC client with the user-provided SocketFactory - * if no cached client exists. - * - * @param conf Configuration - * @return an IPC client - */ - private synchronized Client getClient(Configuration conf, - SocketFactory factory) { - // Construct & cache client. The configuration is only used for timeout, - // and Clients have connection pools. So we can either (a) lose some - // connection pooling and leak sockets, or (b) use the same timeout for all - // configurations. Since the IPC is usually intended globally, not - // per-job, we choose (a). - Client client = clients.get(factory); - if (client == null) { - client = new Client(ObjectWritable.class, conf, factory); - clients.put(factory, client); - } else { - client.incCount(); - } - return client; - } - - /** - * Construct & cache an IPC client with the default SocketFactory - * if no cached client exists. - * - * @param conf Configuration - * @return an IPC client - */ - private synchronized Client getClient(Configuration conf) { - return getClient(conf, SocketFactory.getDefault()); - } - - /** - * Stop a RPC client connection - * A RPC client is closed only when its reference count becomes zero. - */ - private void stopClient(Client client) { - synchronized (this) { - client.decCount(); - if (client.isZeroReference()) { - clients.remove(client.getSocketFactory()); - } - } - if (client.isZeroReference()) { - client.stop(); - } + // return the RpcEngine configured to handle a protocol + private static synchronized RpcEngine getProtocolEngine(Class protocol, + Configuration conf) { + RpcEngine engine = PROTOCOL_ENGINES.get(protocol); + if (engine == null) { + Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), + WritableRpcEngine.class); + LOG.info("Using "+impl.getName()+" for "+protocol.getName()); + engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); + if (protocol.isInterface()) + PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), + protocol), + engine); + PROTOCOL_ENGINES.put(protocol, engine); } + return engine; } - private static ClientCache CLIENTS=new ClientCache(); - - private static class Invoker implements InvocationHandler { - private Class protocol; - private InetSocketAddress address; - private UserGroupInformation ticket; - private Client client; - private boolean isClosed = false; - - public Invoker(Class protocol, - InetSocketAddress address, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) { - this.protocol = protocol; - this.address = address; - this.ticket = ticket; - this.client = CLIENTS.getClient(conf, factory); - } - - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - final boolean logDebug = LOG.isDebugEnabled(); - long startTime = 0; - if (logDebug) { - startTime = System.currentTimeMillis(); - } - - ObjectWritable value = (ObjectWritable) - client.call(new Invocation(method, args), address, - protocol, ticket); - if (logDebug) { - long callTime = System.currentTimeMillis() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); - } - return value.get(); - } - - /* close the IPC client that's responsible for this invoker's RPCs */ - synchronized private void close() { - if (!isClosed) { - isClosed = true; - CLIENTS.stopClient(client); - } - } + // return the RpcEngine that handles a proxy object + private static synchronized RpcEngine getProxyEngine(Object proxy) { + return PROXY_ENGINES.get(proxy.getClass()); } /** * A version mismatch for the RPC protocol. */ public static class VersionMismatch extends IOException { + private static final long serialVersionUID = 0; + private String interfaceName; private long clientVersion; private long serverVersion; @@ -286,8 +153,8 @@ public long getServerVersion() { } } - public static VersionedProtocol waitForProxy( - Class protocol, + public static Object waitForProxy( + Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf @@ -305,13 +172,9 @@ public static VersionedProtocol waitForProxy( * @return the proxy * @throws IOException if the far end through a RemoteException */ - static VersionedProtocol waitForProxy( - Class protocol, - long clientVersion, - InetSocketAddress addr, - Configuration conf, - long timeout - ) throws IOException { + static Object waitForProxy(Class protocol, long clientVersion, + InetSocketAddress addr, Configuration conf, + long timeout) throws IOException { long startTime = System.currentTimeMillis(); IOException ioe; while (true) { @@ -337,12 +200,12 @@ static VersionedProtocol waitForProxy( } } } + /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static VersionedProtocol getProxy( - Class protocol, - long clientVersion, InetSocketAddress addr, Configuration conf, - SocketFactory factory) throws IOException { + public static Object getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, Configuration conf, + SocketFactory factory) throws IOException { UserGroupInformation ugi = null; try { ugi = UserGroupInformation.login(conf); @@ -354,23 +217,13 @@ public static VersionedProtocol getProxy( /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static VersionedProtocol getProxy( - Class protocol, - long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) throws IOException { - - VersionedProtocol proxy = - (VersionedProtocol) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory)); - long serverVersion = proxy.getProtocolVersion(protocol.getName(), - clientVersion); - if (serverVersion == clientVersion) { - return proxy; - } else { - throw new VersionMismatch(protocol.getName(), clientVersion, - serverVersion); - } + public static Object getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, + SocketFactory factory) throws IOException { + return getProtocolEngine(protocol,conf) + .getProxy(protocol, clientVersion, addr, ticket, conf, factory); } /** @@ -383,10 +236,9 @@ public static VersionedProtocol getProxy( * @return a proxy instance * @throws IOException */ - public static VersionedProtocol getProxy( - Class protocol, - long clientVersion, InetSocketAddress addr, Configuration conf) - throws IOException { + public static Object getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, Configuration conf) + throws IOException { return getProxy(protocol, clientVersion, addr, conf, NetUtils .getDefaultSocketFactory(conf)); @@ -396,9 +248,9 @@ public static VersionedProtocol getProxy( * Stop this proxy and release its invoker's resource * @param proxy the proxy to be stopped */ - public static void stopProxy(VersionedProtocol proxy) { + public static void stopProxy(Object proxy) { if (proxy!=null) { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); + getProxyEngine(proxy).stopProxy(proxy); } } @@ -406,6 +258,7 @@ public static void stopProxy(VersionedProtocol proxy) { * Expert: Make multiple, parallel calls to a set of servers. * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead */ + @Deprecated public static Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, Configuration conf) throws IOException { @@ -418,169 +271,61 @@ public static Object[] call(Method method, Object[][] params, UserGroupInformation ticket, Configuration conf) throws IOException { - Invocation[] invocations = new Invocation[params.length]; - for (int i = 0; i < params.length; i++) - invocations[i] = new Invocation(method, params[i]); - Client client = CLIENTS.getClient(conf); - try { - Writable[] wrappedValues = - client.call(invocations, addrs, method.getDeclaringClass(), ticket); - - if (method.getReturnType() == Void.TYPE) { - return null; - } - - Object[] values = - (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); - for (int i = 0; i < values.length; i++) - if (wrappedValues[i] != null) - values[i] = ((ObjectWritable)wrappedValues[i]).get(); - - return values; - } finally { - CLIENTS.stopClient(client); - } + return getProtocolEngine(method.getDeclaringClass(), conf) + .call(method, params, addrs, ticket, conf); } /** Construct a server for a protocol implementation instance listening on a - * port and address. */ + * port and address. + * @deprecated protocol interface should be passed. + */ + @Deprecated public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) throws IOException { return getServer(instance, bindAddress, port, 1, false, conf); } /** Construct a server for a protocol implementation instance listening on a - * port and address. */ + * port and address. + * @deprecated protocol interface should be passed. + */ + @Deprecated public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf) throws IOException { - return new Server(instance, conf, bindAddress, port, numHandlers, verbose); + return getServer(instance.getClass(), // use impl class for protocol + instance, bindAddress, port, numHandlers, false, conf); + } + + /** Construct a server for a protocol implementation instance. */ + public static Server getServer(Class protocol, + Object instance, String bindAddress, + int port, Configuration conf) + throws IOException { + return getServer(protocol, instance, bindAddress, port, 1, false, conf); + } + + /** Construct a server for a protocol implementation instance. */ + public static Server getServer(Class protocol, + Object instance, String bindAddress, int port, + int numHandlers, + boolean verbose, Configuration conf) + throws IOException { + + return getProtocolEngine(protocol, conf) + .getServer(protocol, instance, bindAddress, port, numHandlers, verbose, + conf); } /** An RPC Server. */ - public static class Server extends org.apache.hadoop.ipc.Server { - private Object instance; - private boolean verbose; - private boolean authorize = false; - - /** Construct an RPC server. - * @param instance the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - */ - public Server(Object instance, Configuration conf, String bindAddress, int port) - throws IOException { - this(instance, conf, bindAddress, port, 1, false); - } - - private static String classNameBase(String className) { - String[] names = className.split("\\.", -1); - if (names == null || names.length == 0) { - return className; - } - return names[names.length-1]; - } - - /** Construct an RPC server. - * @param instance the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - */ - public Server(Object instance, Configuration conf, String bindAddress, int port, - int numHandlers, boolean verbose) throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName())); - this.instance = instance; - this.verbose = verbose; - this.authorize = - conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, - false); - } - - public Writable call(Class protocol, Writable param, long receivedTime) - throws IOException { - try { - Invocation call = (Invocation)param; - if (verbose) log("Call: " + call); - - Method method = - protocol.getMethod(call.getMethodName(), - call.getParameterClasses()); - method.setAccessible(true); - - long startTime = System.currentTimeMillis(); - Object value = method.invoke(instance, call.getParameters()); - int processingTime = (int) (System.currentTimeMillis() - startTime); - int qTime = (int) (startTime-receivedTime); - if (LOG.isDebugEnabled()) { - LOG.debug("Served: " + call.getMethodName() + - " queueTime= " + qTime + - " procesingTime= " + processingTime); - } - rpcMetrics.rpcQueueTime.inc(qTime); - rpcMetrics.rpcProcessingTime.inc(processingTime); - - MetricsTimeVaryingRate m = - (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName()); - if (m == null) { - try { - m = new MetricsTimeVaryingRate(call.getMethodName(), - rpcMetrics.registry); - } catch (IllegalArgumentException iae) { - // the metrics has been registered; re-fetch the handle - LOG.info("Error register " + call.getMethodName(), iae); - m = (MetricsTimeVaryingRate) rpcMetrics.registry.get( - call.getMethodName()); - } - } - m.inc(processingTime); - - if (verbose) log("Return: "+value); - - return new ObjectWritable(method.getReturnType(), value); - - } catch (InvocationTargetException e) { - Throwable target = e.getTargetException(); - if (target instanceof IOException) { - throw (IOException)target; - } else { - IOException ioe = new IOException(target.toString()); - ioe.setStackTrace(target.getStackTrace()); - throw ioe; - } - } catch (Throwable e) { - if (!(e instanceof IOException)) { - LOG.error("Unexpected throwable object ", e); - } - IOException ioe = new IOException(e.toString()); - ioe.setStackTrace(e.getStackTrace()); - throw ioe; - } - } - - @Override - public void authorize(Subject user, ConnectionHeader connection) - throws AuthorizationException { - if (authorize) { - Class protocol = null; - try { - protocol = getProtocolClass(connection.getProtocol(), getConf()); - } catch (ClassNotFoundException cfne) { - throw new AuthorizationException("Unknown protocol: " + - connection.getProtocol()); - } - ServiceAuthorizationManager.authorize(user, protocol); - } + public abstract static class Server extends org.apache.hadoop.ipc.Server { + + protected Server(String bindAddress, int port, + Class paramClass, int handlerCount, + Configuration conf, String serverName) throws IOException { + super(bindAddress, port, paramClass, handlerCount, conf, serverName); } } - private static void log(String value) { - if (value!= null && value.length() > 55) - value = value.substring(0, 55)+"..."; - LOG.info(value); - } } diff --git a/src/java/org/apache/hadoop/ipc/RpcEngine.java b/src/java/org/apache/hadoop/ipc/RpcEngine.java new file mode 100644 index 00000000000..baed883e66e --- /dev/null +++ b/src/java/org/apache/hadoop/ipc/RpcEngine.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.lang.reflect.Method; +import java.io.IOException; +import java.net.InetSocketAddress; +import javax.net.SocketFactory; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.conf.Configuration; + +/** An RPC implementation. */ +interface RpcEngine { + + /** Construct a client-side proxy object. */ + Object getProxy(Class protocol, + long clientVersion, InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, + SocketFactory factory) throws IOException; + + /** Stop this proxy. */ + void stopProxy(Object proxy); + + /** Expert: Make multiple, parallel calls to a set of servers. */ + Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, + UserGroupInformation ticket, Configuration conf) + throws IOException; + + /** Construct a server for a protocol implementation instance. */ + RPC.Server getServer(Class protocol, Object instance, String bindAddress, + int port, int numHandlers, boolean verbose, + Configuration conf) throws IOException; + +} diff --git a/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java new file mode 100644 index 00000000000..07da74a7044 --- /dev/null +++ b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.lang.reflect.Proxy; +import java.lang.reflect.Method; +import java.lang.reflect.Array; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; + +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.io.*; +import java.util.Map; +import java.util.HashMap; + +import javax.net.SocketFactory; +import javax.security.auth.Subject; +import javax.security.auth.login.LoginException; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; + +/** An RpcEngine implementation for Writable data. */ +class WritableRpcEngine implements RpcEngine { + private static final Log LOG = LogFactory.getLog(RPC.class); + + /** A method invocation, including the method name and its parameters.*/ + private static class Invocation implements Writable, Configurable { + private String methodName; + private Class[] parameterClasses; + private Object[] parameters; + private Configuration conf; + + public Invocation() {} + + public Invocation(Method method, Object[] parameters) { + this.methodName = method.getName(); + this.parameterClasses = method.getParameterTypes(); + this.parameters = parameters; + } + + /** The name of the method invoked. */ + public String getMethodName() { return methodName; } + + /** The parameter classes. */ + public Class[] getParameterClasses() { return parameterClasses; } + + /** The parameter instances. */ + public Object[] getParameters() { return parameters; } + + public void readFields(DataInput in) throws IOException { + methodName = UTF8.readString(in); + parameters = new Object[in.readInt()]; + parameterClasses = new Class[parameters.length]; + ObjectWritable objectWritable = new ObjectWritable(); + for (int i = 0; i < parameters.length; i++) { + parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf); + parameterClasses[i] = objectWritable.getDeclaredClass(); + } + } + + public void write(DataOutput out) throws IOException { + UTF8.writeString(out, methodName); + out.writeInt(parameterClasses.length); + for (int i = 0; i < parameterClasses.length; i++) { + ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], + conf); + } + } + + public String toString() { + StringBuffer buffer = new StringBuffer(); + buffer.append(methodName); + buffer.append("("); + for (int i = 0; i < parameters.length; i++) { + if (i != 0) + buffer.append(", "); + buffer.append(parameters[i]); + } + buffer.append(")"); + return buffer.toString(); + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return this.conf; + } + + } + + /* Cache a client using its socket factory as the hash key */ + static private class ClientCache { + private Map clients = + new HashMap(); + + /** + * Construct & cache an IPC client with the user-provided SocketFactory + * if no cached client exists. + * + * @param conf Configuration + * @return an IPC client + */ + private synchronized Client getClient(Configuration conf, + SocketFactory factory) { + // Construct & cache client. The configuration is only used for timeout, + // and Clients have connection pools. So we can either (a) lose some + // connection pooling and leak sockets, or (b) use the same timeout for all + // configurations. Since the IPC is usually intended globally, not + // per-job, we choose (a). + Client client = clients.get(factory); + if (client == null) { + client = new Client(ObjectWritable.class, conf, factory); + clients.put(factory, client); + } else { + client.incCount(); + } + return client; + } + + /** + * Construct & cache an IPC client with the default SocketFactory + * if no cached client exists. + * + * @param conf Configuration + * @return an IPC client + */ + private synchronized Client getClient(Configuration conf) { + return getClient(conf, SocketFactory.getDefault()); + } + + /** + * Stop a RPC client connection + * A RPC client is closed only when its reference count becomes zero. + */ + private void stopClient(Client client) { + synchronized (this) { + client.decCount(); + if (client.isZeroReference()) { + clients.remove(client.getSocketFactory()); + } + } + if (client.isZeroReference()) { + client.stop(); + } + } + } + + private static ClientCache CLIENTS=new ClientCache(); + + private static class Invoker implements InvocationHandler { + private Class protocol; + private InetSocketAddress address; + private UserGroupInformation ticket; + private Client client; + private boolean isClosed = false; + + public Invoker(Class protocol, + InetSocketAddress address, UserGroupInformation ticket, + Configuration conf, SocketFactory factory) { + this.protocol = protocol; + this.address = address; + this.ticket = ticket; + this.client = CLIENTS.getClient(conf, factory); + } + + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + final boolean logDebug = LOG.isDebugEnabled(); + long startTime = 0; + if (logDebug) { + startTime = System.currentTimeMillis(); + } + + ObjectWritable value = (ObjectWritable) + client.call(new Invocation(method, args), address, + protocol, ticket); + if (logDebug) { + long callTime = System.currentTimeMillis() - startTime; + LOG.debug("Call: " + method.getName() + " " + callTime); + } + return value.get(); + } + + /* close the IPC client that's responsible for this invoker's RPCs */ + synchronized private void close() { + if (!isClosed) { + isClosed = true; + CLIENTS.stopClient(client); + } + } + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. */ + public Object getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, + Configuration conf, SocketFactory factory) + throws IOException { + + Object proxy = Proxy.newProxyInstance + (protocol.getClassLoader(), new Class[] { protocol }, + new Invoker(protocol, addr, ticket, conf, factory)); + if (proxy instanceof VersionedProtocol) { + long serverVersion = ((VersionedProtocol)proxy) + .getProtocolVersion(protocol.getName(), clientVersion); + if (serverVersion != clientVersion) { + throw new RPC.VersionMismatch(protocol.getName(), clientVersion, + serverVersion); + } + } + return proxy; + } + + /** + * Stop this proxy and release its invoker's resource + * @param proxy the proxy to be stopped + */ + public void stopProxy(Object proxy) { + ((Invoker)Proxy.getInvocationHandler(proxy)).close(); + } + + + /** Expert: Make multiple, parallel calls to a set of servers. */ + public Object[] call(Method method, Object[][] params, + InetSocketAddress[] addrs, + UserGroupInformation ticket, Configuration conf) + throws IOException { + + Invocation[] invocations = new Invocation[params.length]; + for (int i = 0; i < params.length; i++) + invocations[i] = new Invocation(method, params[i]); + Client client = CLIENTS.getClient(conf); + try { + Writable[] wrappedValues = + client.call(invocations, addrs, method.getDeclaringClass(), ticket); + + if (method.getReturnType() == Void.TYPE) { + return null; + } + + Object[] values = + (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); + for (int i = 0; i < values.length; i++) + if (wrappedValues[i] != null) + values[i] = ((ObjectWritable)wrappedValues[i]).get(); + + return values; + } finally { + CLIENTS.stopClient(client); + } + } + + /** Construct a server for a protocol implementation instance listening on a + * port and address. */ + public Server getServer(Class protocol, + Object instance, String bindAddress, int port, + int numHandlers, boolean verbose, Configuration conf) + throws IOException { + return new Server(instance, conf, bindAddress, port, numHandlers, verbose); + } + + /** An RPC Server. */ + public static class Server extends RPC.Server { + private Object instance; + private boolean verbose; + private boolean authorize = false; + + /** Construct an RPC server. + * @param instance the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + */ + public Server(Object instance, Configuration conf, String bindAddress, int port) + throws IOException { + this(instance, conf, bindAddress, port, 1, false); + } + + private static String classNameBase(String className) { + String[] names = className.split("\\.", -1); + if (names == null || names.length == 0) { + return className; + } + return names[names.length-1]; + } + + /** Construct an RPC server. + * @param instance the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + */ + public Server(Object instance, Configuration conf, String bindAddress, int port, + int numHandlers, boolean verbose) throws IOException { + super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName())); + this.instance = instance; + this.verbose = verbose; + this.authorize = + conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, + false); + } + + public Writable call(Class protocol, Writable param, long receivedTime) + throws IOException { + try { + Invocation call = (Invocation)param; + if (verbose) log("Call: " + call); + + Method method = + protocol.getMethod(call.getMethodName(), + call.getParameterClasses()); + method.setAccessible(true); + + long startTime = System.currentTimeMillis(); + Object value = method.invoke(instance, call.getParameters()); + int processingTime = (int) (System.currentTimeMillis() - startTime); + int qTime = (int) (startTime-receivedTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Served: " + call.getMethodName() + + " queueTime= " + qTime + + " procesingTime= " + processingTime); + } + rpcMetrics.rpcQueueTime.inc(qTime); + rpcMetrics.rpcProcessingTime.inc(processingTime); + + MetricsTimeVaryingRate m = + (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName()); + if (m == null) { + try { + m = new MetricsTimeVaryingRate(call.getMethodName(), + rpcMetrics.registry); + } catch (IllegalArgumentException iae) { + // the metrics has been registered; re-fetch the handle + LOG.info("Error register " + call.getMethodName(), iae); + m = (MetricsTimeVaryingRate) rpcMetrics.registry.get( + call.getMethodName()); + } + } + m.inc(processingTime); + + if (verbose) log("Return: "+value); + + return new ObjectWritable(method.getReturnType(), value); + + } catch (InvocationTargetException e) { + Throwable target = e.getTargetException(); + if (target instanceof IOException) { + throw (IOException)target; + } else { + IOException ioe = new IOException(target.toString()); + ioe.setStackTrace(target.getStackTrace()); + throw ioe; + } + } catch (Throwable e) { + if (!(e instanceof IOException)) { + LOG.error("Unexpected throwable object ", e); + } + IOException ioe = new IOException(e.toString()); + ioe.setStackTrace(e.getStackTrace()); + throw ioe; + } + } + + @Override + public void authorize(Subject user, ConnectionHeader connection) + throws AuthorizationException { + if (authorize) { + Class protocol = null; + try { + protocol = getProtocolClass(connection.getProtocol(), getConf()); + } catch (ClassNotFoundException cfne) { + throw new AuthorizationException("Unknown protocol: " + + connection.getProtocol()); + } + ServiceAuthorizationManager.authorize(user, protocol); + } + } + } + + private static void log(String value) { + if (value!= null && value.length() > 55) + value = value.substring(0, 55)+"..."; + LOG.info(value); + } +} diff --git a/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java b/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java index 703d8cb83c2..8aaa4e94fe0 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java +++ b/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java @@ -61,14 +61,16 @@ public int error() throws Problem { public void testCalls() throws Exception { Configuration conf = new Configuration(); - Server server = AvroRpc.getServer(new TestImpl(), ADDRESS, 0, conf); + RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class); + Server server = RPC.getServer(AvroTestProtocol.class, + new TestImpl(), ADDRESS, 0, conf); AvroTestProtocol proxy = null; try { server.start(); InetSocketAddress addr = NetUtils.getConnectAddress(server); proxy = - (AvroTestProtocol)AvroRpc.getProxy(AvroTestProtocol.class, addr, conf); + (AvroTestProtocol)RPC.getProxy(AvroTestProtocol.class, 0, addr, conf); proxy.ping(); diff --git a/src/test/core/org/apache/hadoop/ipc/TestRPC.java b/src/test/core/org/apache/hadoop/ipc/TestRPC.java index e12cfaab73a..e825c587b65 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestRPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestRPC.java @@ -190,7 +190,8 @@ public void run() { public void testSlowRpc() throws Exception { System.out.println("Testing Slow RPC"); // create a server with two handlers - Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 2, false, conf); + Server server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 2, false, conf); TestProtocol proxy = null; try { @@ -230,9 +231,9 @@ public void testSlowRpc() throws Exception { } } - public void testCalls(Configuration conf) throws Exception { - Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf); + Server server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, conf); TestProtocol proxy = null; try { server.start(); @@ -306,8 +307,8 @@ public void testCalls(Configuration conf) throws Exception { assertTrue(Arrays.equals(strings, new String[]{"a","b"})); Method ping = TestProtocol.class.getMethod("ping", new Class[] {}); - Object[] voids = (Object[])RPC.call(ping, new Object[][]{{},{}}, - new InetSocketAddress[] {addr, addr}, conf); + Object[] voids = RPC.call(ping, new Object[][]{{},{}}, + new InetSocketAddress[] {addr, addr}, conf); assertEquals(voids, null); } finally { server.stop(); @@ -339,7 +340,8 @@ public Service[] getServices() { private void doRPCs(Configuration conf, boolean expectFailure) throws Exception { SecurityUtil.setPolicy(new ConfiguredPolicy(conf, new TestPolicyProvider())); - Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 5, true, conf); + Server server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 5, true, conf); TestProtocol proxy = null;