From 19e62e9b3d51fdee870b4a32580cffb583f3029f Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Fri, 6 Jan 2012 10:59:56 +0000 Subject: [PATCH] svn merge -c 1164771 from trunk for HADOOP-7524. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1228081 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 4 + .../java/org/apache/hadoop/ipc/Client.java | 4 +- .../org/apache/hadoop/ipc/ProtocolInfo.java | 38 ++ .../main/java/org/apache/hadoop/ipc/RPC.java | 32 +- .../java/org/apache/hadoop/ipc/Server.java | 21 +- .../apache/hadoop/ipc/VersionedProtocol.java | 1 - .../apache/hadoop/ipc/WritableRpcEngine.java | 349 ++++++++++++++++-- .../java/org/apache/hadoop/ipc/TestIPC.java | 2 +- .../hadoop/ipc/TestIPCServerResponder.java | 2 +- .../ipc/TestMultipleProtocolServer.java | 255 +++++++++++++ .../hadoop/ipc/TestRPCCompatibility.java | 39 +- 11 files changed, 681 insertions(+), 66 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5778d51c540..c37b2848914 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1,5 +1,9 @@ Hadoop Change Log +Release 0.23-PB - Unreleased + + HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) + Release 0.23.1 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 6d0b4dfed89..0acb8f8af90 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -287,8 +287,8 @@ public class Client { authMethod = AuthMethod.KERBEROS; } - header = new ConnectionHeader(protocol == null ? null : protocol - .getName(), ticket, authMethod); + header = + new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod); if (LOG.isDebugEnabled()) LOG.debug("Use " + authMethod + " authentication for protocol " diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java new file mode 100644 index 00000000000..924fa8b1501 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + + +/** + * The protocol name that is used when a client and server connect. + * By default the class name of the protocol interface is the protocol name. + * + * Why override the default name (i.e. the class name)? + * One use case overriding the default name (i.e. the class name) is when + * there are multiple implementations of the same protocol, each with say a + * different version/serialization. + * In Hadoop this is used to allow multiple server and client adapters + * for different versions of the same protocol service. + */ +@Retention(RetentionPolicy.RUNTIME) +public @interface ProtocolInfo { + String protocolName(); // the name of the protocol (i.e. rpc service) +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index b42b9133f51..453a5dd1750 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -62,6 +62,20 @@ import org.apache.hadoop.util.ReflectionUtils; */ public class RPC { static final Log LOG = LogFactory.getLog(RPC.class); + + + /** + * Get the protocol name. + * If the protocol class has a ProtocolAnnotation, then get the protocol + * name from the annotation; otherwise the class name is the protocol name. + */ + static public String getProtocolName(Class protocol) { + if (protocol == null) { + return null; + } + ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class); + return (anno == null) ? protocol.getName() : anno.protocolName(); + } private RPC() {} // no public ctor @@ -553,8 +567,10 @@ public class RPC { } /** Construct a server for a protocol implementation instance. */ - public static Server getServer(Class protocol, - Object instance, String bindAddress, int port, + + public static + Server getServer(Class protocol, + IMPL instance, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager secretManager) @@ -576,6 +592,18 @@ public class RPC { super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, conf, serverName, secretManager); } + + /** + * Add a protocol to the existing server. + * @param protocolClass - the protocol class + * @param protocolImpl - the impl of the protocol that will be called + * @return the server (for convenience) + */ + public + Server addProtocol(Class protocolClass, IMPL protocolImpl + ) throws IOException { + throw new IOException("addProtocol Not Implemented"); + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 74755ce3e43..34819f79445 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -893,7 +893,7 @@ public abstract class Server { private InetAddress addr; ConnectionHeader header = new ConnectionHeader(); - Class protocol; + String protocolName; boolean useSasl; SaslServer saslServer; private AuthMethod authMethod; @@ -1284,15 +1284,8 @@ public abstract class Server { DataInputStream in = new DataInputStream(new ByteArrayInputStream(buf)); header.readFields(in); - try { - String protocolClassName = header.getProtocol(); - if (protocolClassName != null) { - protocol = getProtocolClass(header.getProtocol(), conf); - rpcDetailedMetrics.init(protocol); - } - } catch (ClassNotFoundException cnfe) { - throw new IOException("Unknown protocol: " + header.getProtocol()); - } + protocolName = header.getProtocol(); + UserGroupInformation protocolUser = header.getUgi(); if (!useSasl) { @@ -1481,7 +1474,7 @@ public abstract class Server { // Make the call as the user via Subject.doAs, thus associating // the call with the Subject if (call.connection.user == null) { - value = call(call.connection.protocol, call.param, + value = call(call.connection.protocolName, call.param, call.timestamp); } else { value = @@ -1490,7 +1483,7 @@ public abstract class Server { @Override public Writable run() throws Exception { // make the call - return call(call.connection.protocol, + return call(call.connection.protocolName, call.param, call.timestamp); } @@ -1758,7 +1751,7 @@ public abstract class Server { /** * Called for each call. - * @deprecated Use {@link #call(Class, Writable, long)} instead + * @deprecated Use {@link #call(String, Writable, long)} instead */ @Deprecated public Writable call(Writable param, long receiveTime) throws IOException { @@ -1766,7 +1759,7 @@ public abstract class Server { } /** Called for each call. */ - public abstract Writable call(Class protocol, + public abstract Writable call(String protocol, Writable param, long receiveTime) throws IOException; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java index 4558f2150dc..4d02027a0e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java @@ -34,7 +34,6 @@ public interface VersionedProtocol { * @return the version that the server will speak * @throws IOException if any IO error occurs */ - @Deprecated public long getProtocolVersion(String protocol, long clientVersion) throws IOException; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index e587913923e..b28949d99a4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -27,6 +27,9 @@ import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.io.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.HashMap; @@ -35,6 +38,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -47,10 +51,46 @@ import org.apache.hadoop.conf.*; public class WritableRpcEngine implements RpcEngine { private static final Log LOG = LogFactory.getLog(RPC.class); + + /** + * Get all superInterfaces that extend VersionedProtocol + * @param childInterfaces + * @return the super interfaces that extend VersionedProtocol + */ + private static Class[] getSuperInterfaces(Class[] childInterfaces) { + List> allInterfaces = new ArrayList>(); + + for (Class childInterface : childInterfaces) { + if (VersionedProtocol.class.isAssignableFrom(childInterface)) { + allInterfaces.add(childInterface); + allInterfaces.addAll( + Arrays.asList( + getSuperInterfaces(childInterface.getInterfaces()))); + } else { + LOG.warn("Interface " + childInterface + + " ignored because it does not extend VersionedProtocol"); + } + } + return (Class[]) allInterfaces.toArray(new Class[allInterfaces.size()]); + } + + /** + * Get all interfaces that the given protocol implements or extends + * which are assignable from VersionedProtocol. + */ + private static Class[] getProtocolInterfaces(Class protocol) { + Class[] interfaces = protocol.getInterfaces(); + return getSuperInterfaces(interfaces); + } + + //writableRpcVersion should be updated if there is a change //in format of the rpc messages. - public static long writableRpcVersion = 1L; + + // 2L - added declared class to Invocation + public static final long writableRpcVersion = 2L; + /** A method invocation, including the method name and its parameters.*/ private static class Invocation implements Writable, Configurable { private String methodName; @@ -59,11 +99,13 @@ public class WritableRpcEngine implements RpcEngine { private Configuration conf; private long clientVersion; private int clientMethodsHash; + private String declaringClassProtocolName; //This could be different from static writableRpcVersion when received //at server, if client is using a different version. private long rpcVersion; + @SuppressWarnings("unused") // called when deserializing an invocation public Invocation() {} public Invocation(Method method, Object[] parameters) { @@ -88,6 +130,8 @@ public class WritableRpcEngine implements RpcEngine { this.clientMethodsHash = ProtocolSignature.getFingerprint(method .getDeclaringClass().getMethods()); } + this.declaringClassProtocolName = + RPC.getProtocolName(method.getDeclaringClass()); } /** The name of the method invoked. */ @@ -103,6 +147,7 @@ public class WritableRpcEngine implements RpcEngine { return clientVersion; } + @SuppressWarnings("unused") private int getClientMethodsHash() { return clientMethodsHash; } @@ -115,8 +160,10 @@ public class WritableRpcEngine implements RpcEngine { return rpcVersion; } + @SuppressWarnings("deprecation") public void readFields(DataInput in) throws IOException { rpcVersion = in.readLong(); + declaringClassProtocolName = UTF8.readString(in); methodName = UTF8.readString(in); clientVersion = in.readLong(); clientMethodsHash = in.readInt(); @@ -124,13 +171,16 @@ public class WritableRpcEngine implements RpcEngine { parameterClasses = new Class[parameters.length]; ObjectWritable objectWritable = new ObjectWritable(); for (int i = 0; i < parameters.length; i++) { - parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf); + parameters[i] = + ObjectWritable.readObject(in, objectWritable, this.conf); parameterClasses[i] = objectWritable.getDeclaredClass(); } } + @SuppressWarnings("deprecation") public void write(DataOutput out) throws IOException { out.writeLong(rpcVersion); + UTF8.writeString(out, declaringClassProtocolName); UTF8.writeString(out, methodName); out.writeLong(clientVersion); out.writeInt(clientMethodsHash); @@ -273,30 +323,161 @@ public class WritableRpcEngine implements RpcEngine { /** Construct a server for a protocol implementation instance listening on a * port and address. */ - public Server getServer(Class protocol, - Object instance, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, Configuration conf, + public RPC.Server getServer(Class protocolClass, + Object protocolImpl, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, SecretManager secretManager) throws IOException { - return new Server(instance, conf, bindAddress, port, numHandlers, - numReaders, queueSizePerHandler, verbose, secretManager); + return new Server(protocolClass, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, secretManager); } + /** An RPC Server. */ public static class Server extends RPC.Server { - private Object instance; private boolean verbose; + + /** + * The key in Map + */ + static class ProtoNameVer { + final String protocol; + final long version; + ProtoNameVer(String protocol, long ver) { + this.protocol = protocol; + this.version = ver; + } + @Override + public boolean equals(Object o) { + if (o == null) + return false; + if (this == o) + return true; + if (! (o instanceof ProtoNameVer)) + return false; + ProtoNameVer pv = (ProtoNameVer) o; + return ((pv.protocol.equals(this.protocol)) && + (pv.version == this.version)); + } + @Override + public int hashCode() { + return protocol.hashCode() * 37 + (int) version; + } + } + + /** + * The value in map + */ + static class ProtoClassProtoImpl { + final Class protocolClass; + final Object protocolImpl; + ProtoClassProtoImpl(Class protocolClass, Object protocolImpl) { + this.protocolClass = protocolClass; + this.protocolImpl = protocolImpl; + } + } + + private Map protocolImplMap = + new HashMap(10); + + // Register protocol and its impl for rpc calls + private void registerProtocolAndImpl(Class protocolClass, + Object protocolImpl) throws IOException { + String protocolName = RPC.getProtocolName(protocolClass); + VersionedProtocol vp = (VersionedProtocol) protocolImpl; + long version; + try { + version = vp.getProtocolVersion(protocolName, 0); + } catch (Exception ex) { + LOG.warn("Protocol " + protocolClass + + " NOT registered as getProtocolVersion throws exception "); + return; + } + protocolImplMap.put(new ProtoNameVer(protocolName, version), + new ProtoClassProtoImpl(protocolClass, protocolImpl)); + LOG.info("ProtocolImpl=" + protocolImpl.getClass().getName() + + " protocolClass=" + protocolClass.getName() + " version=" + version); + } + + private static class VerProtocolImpl { + final long version; + final ProtoClassProtoImpl protocolTarget; + VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) { + this.version = ver; + this.protocolTarget = protocolTarget; + } + } + + + @SuppressWarnings("unused") // will be useful later. + private VerProtocolImpl[] getSupportedProtocolVersions( + String protocolName) { + VerProtocolImpl[] resultk = new VerProtocolImpl[protocolImplMap.size()]; + int i = 0; + for (Map.Entry pv : + protocolImplMap.entrySet()) { + if (pv.getKey().protocol.equals(protocolName)) { + resultk[i++] = + new VerProtocolImpl(pv.getKey().version, pv.getValue()); + } + } + if (i == 0) { + return null; + } + VerProtocolImpl[] result = new VerProtocolImpl[i]; + System.arraycopy(resultk, 0, result, 0, i); + return result; + } + + private VerProtocolImpl getHighestSupportedProtocol(String protocolName) { + Long highestVersion = 0L; + ProtoClassProtoImpl highest = null; + for (Map.Entry pv : protocolImplMap + .entrySet()) { + if (pv.getKey().protocol.equals(protocolName)) { + if ((highest == null) || (pv.getKey().version > highestVersion)) { + highest = pv.getValue(); + highestVersion = pv.getKey().version; + } + } + } + if (highest == null) { + return null; + } + return new VerProtocolImpl(highestVersion, highest); + } + /** Construct an RPC server. * @param instance the instance whose methods will be called * @param conf the configuration to use * @param bindAddress the address to bind on to listen for connection * @param port the port to listen for connections on + * + * @deprecated Use #Server(Class, Object, Configuration, String, int) + * */ - public Server(Object instance, Configuration conf, String bindAddress, int port) + @Deprecated + public Server(Object instance, Configuration conf, String bindAddress, + int port) throws IOException { - this(instance, conf, bindAddress, port, 1, -1, -1, false, null); + this(null, instance, conf, bindAddress, port); + } + + + /** Construct an RPC server. + * @param protocol class + * @param instance the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + */ + public Server(Class protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port) + throws IOException { + this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1, + false, null); } private static String classNameBase(String className) { @@ -307,35 +488,103 @@ public class WritableRpcEngine implements RpcEngine { return names[names.length-1]; } + /** Construct an RPC server. - * @param instance the instance whose methods will be called + * @param protocolImpl the instance whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + * + * @deprecated use Server#Server(Class, Object, + * Configuration, String, int, int, int, int, boolean, SecretManager) + */ + @Deprecated + public Server(Object protocolImpl, Configuration conf, String bindAddress, + int port, int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, SecretManager secretManager) + throws IOException { + this(null, protocolImpl, conf, bindAddress, port, + numHandlers, numReaders, queueSizePerHandler, verbose, + secretManager); + + } + + /** Construct an RPC server. + * @param protocolClass - the protocol being registered + * can be null for compatibility with old usage (see below for details) + * @param protocolImpl the protocol impl that will be called * @param conf the configuration to use * @param bindAddress the address to bind on to listen for connection * @param port the port to listen for connections on * @param numHandlers the number of method handler threads to run * @param verbose whether each call should be logged */ - public Server(Object instance, Configuration conf, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, - SecretManager secretManager) + public Server(Class protocolClass, Object protocolImpl, + Configuration conf, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, SecretManager secretManager) throws IOException { super(bindAddress, port, Invocation.class, numHandlers, numReaders, queueSizePerHandler, conf, - classNameBase(instance.getClass().getName()), secretManager); - this.instance = instance; + classNameBase(protocolImpl.getClass().getName()), secretManager); + this.verbose = verbose; + + + Class[] protocols; + if (protocolClass == null) { // derive protocol from impl + /* + * In order to remain compatible with the old usage where a single + * target protocolImpl is suppled for all protocol interfaces, and + * the protocolImpl is derived from the protocolClass(es) + * we register all interfaces extended by the protocolImpl + */ + protocols = getProtocolInterfaces(protocolImpl.getClass()); + + } else { + if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) { + throw new IOException("protocolClass "+ protocolClass + + " is not implemented by protocolImpl which is of class " + + protocolImpl.getClass()); + } + // register protocol class and its super interfaces + registerProtocolAndImpl(protocolClass, protocolImpl); + protocols = getProtocolInterfaces(protocolClass); + } + for (Class p : protocols) { + if (!p.equals(VersionedProtocol.class)) { + registerProtocolAndImpl(p, protocolImpl); + } + } + } - public Writable call(Class protocol, Writable param, long receivedTime) + + @Override + public Server + addProtocol( + Class protocolClass, IMPL protocolImpl) throws IOException { + registerProtocolAndImpl(protocolClass, protocolImpl); + return this; + } + + /** + * Process a client call + * @param protocolName - the protocol name (the class of the client proxy + * used to make calls to the rpc server. + * @param param parameters + * @param receivedTime time at which the call receoved (for metrics) + * @return the call's return + * @throws IOException + */ + public Writable call(String protocolName, Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation)param; if (verbose) log("Call: " + call); - Method method = protocol.getMethod(call.getMethodName(), - call.getParameterClasses()); - method.setAccessible(true); - // Verify rpc version if (call.getRpcVersion() != writableRpcVersion) { // Client is using a different version of WritableRpc @@ -344,25 +593,51 @@ public class WritableRpcEngine implements RpcEngine { + call.getRpcVersion() + ", server side version=" + writableRpcVersion); } - - //Verify protocol version. - //Bypass the version check for VersionedProtocol - if (!method.getDeclaringClass().equals(VersionedProtocol.class)) { - long clientVersion = call.getProtocolVersion(); - ProtocolSignature serverInfo = ((VersionedProtocol) instance) - .getProtocolSignature(protocol.getCanonicalName(), call - .getProtocolVersion(), call.getClientMethodsHash()); - long serverVersion = serverInfo.getVersion(); - if (serverVersion != clientVersion) { - LOG.warn("Version mismatch: client version=" + clientVersion - + ", server version=" + serverVersion); - throw new RPC.VersionMismatch(protocol.getName(), clientVersion, - serverVersion); + + long clientVersion = call.getProtocolVersion(); + final String protoName; + ProtoClassProtoImpl protocolImpl; + if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) { + // VersionProtocol methods are often used by client to figure out + // which version of protocol to use. + // + // Versioned protocol methods should go the protocolName protocol + // rather than the declaring class of the method since the + // the declaring class is VersionedProtocol which is not + // registered directly. + // Send the call to the highest protocol version + protocolImpl = + getHighestSupportedProtocol(protocolName).protocolTarget; + } else { + protoName = call.declaringClassProtocolName; + + // Find the right impl for the protocol based on client version. + ProtoNameVer pv = + new ProtoNameVer(call.declaringClassProtocolName, clientVersion); + protocolImpl = protocolImplMap.get(pv); + if (protocolImpl == null) { // no match for Protocol AND Version + VerProtocolImpl highest = + getHighestSupportedProtocol(protoName); + if (highest == null) { + throw new IOException("Unknown protocol: " + protoName); + } else { // protocol supported but not the version that client wants + throw new RPC.VersionMismatch(protoName, clientVersion, + highest.version); + } } } + + + // Invoke the protocol method long startTime = System.currentTimeMillis(); - Object value = method.invoke(instance, call.getParameters()); + Method method = + protocolImpl.protocolClass.getMethod(call.getMethodName(), + call.getParameterClasses()); + method.setAccessible(true); + rpcDetailedMetrics.init(protocolImpl.protocolClass); + Object value = + method.invoke(protocolImpl.protocolImpl, call.getParameters()); int processingTime = (int) (System.currentTimeMillis() - startTime); int qTime = (int) (startTime-receivedTime); if (LOG.isDebugEnabled()) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 72409d50cb1..7c01e2f191a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -96,7 +96,7 @@ public class TestIPC { } @Override - public Writable call(Class protocol, Writable param, long receiveTime) + public Writable call(String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { // sleep a bit diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java index 3710198295c..d4400effa7f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java @@ -72,7 +72,7 @@ public class TestIPCServerResponder extends TestCase { } @Override - public Writable call(Class protocol, Writable param, long receiveTime) + public Writable call(String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { try { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java new file mode 100644 index 00000000000..203c2855bcb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +public class TestMultipleProtocolServer { + private static final String ADDRESS = "0.0.0.0"; + private static InetSocketAddress addr; + private static RPC.Server server; + + private static Configuration conf = new Configuration(); + + + @ProtocolInfo(protocolName="Foo") + interface Foo0 extends VersionedProtocol { + public static final long versionID = 0L; + String ping() throws IOException; + + } + + @ProtocolInfo(protocolName="Foo") + interface Foo1 extends VersionedProtocol { + public static final long versionID = 1L; + String ping() throws IOException; + String ping2() throws IOException; + } + + @ProtocolInfo(protocolName="Foo") + interface FooUnimplemented extends VersionedProtocol { + public static final long versionID = 2L; + String ping() throws IOException; + } + + interface Mixin extends VersionedProtocol{ + public static final long versionID = 0L; + void hello() throws IOException; + } + interface Bar extends Mixin, VersionedProtocol { + public static final long versionID = 0L; + int echo(int i) throws IOException; + } + + + + class Foo0Impl implements Foo0 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo0.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo0"; + } + + } + + class Foo1Impl implements Foo1 { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Foo1.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public String ping() { + return "Foo1"; + } + + @Override + public String ping2() { + return "Foo1"; + + } + + } + + + class BarImpl implements Bar { + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return Bar.versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)getClass(). + getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public int echo(int i) { + return i; + } + + @Override + public void hello() { + + + } + } + @Before + public void setUp() throws Exception { + // create a server with two handlers + server = RPC.getServer(Foo0.class, + new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); + server.addProtocol(Foo1.class, new Foo1Impl()); + server.addProtocol(Bar.class, new BarImpl()); + server.addProtocol(Mixin.class, new BarImpl()); + server.start(); + addr = NetUtils.getConnectAddress(server); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void test1() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf); + + Foo0 foo0 = (Foo0)proxy.getProxy(); + Assert.assertEquals("Foo0", foo0.ping()); + + + proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf); + + + Foo1 foo1 = (Foo1)proxy.getProxy(); + Assert.assertEquals("Foo1", foo1.ping()); + Assert.assertEquals("Foo1", foo1.ping()); + + + proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf); + + + Bar bar = (Bar)proxy.getProxy(); + Assert.assertEquals(99, bar.echo(99)); + + // Now test Mixin class method + + Mixin mixin = bar; + mixin.hello(); + } + + + // Server does not implement the FooUnimplemented version of protocol Foo. + // See that calls to it fail. + @Test(expected=IOException.class) + public void testNonExistingProtocol() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + foo.ping(); + } + + + /** + * getProtocolVersion of an unimplemented version should return highest version + * Similarly getProtocolSignature should work. + * @throws IOException + */ + @Test + public void testNonExistingProtocol2() throws IOException { + ProtocolProxy proxy; + proxy = RPC.getProtocolProxy(FooUnimplemented.class, + FooUnimplemented.versionID, addr, conf); + + FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); + Assert.assertEquals(Foo1.versionID, + foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID)); + foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), + FooUnimplemented.versionID, 0); + } + + @Test(expected=IOException.class) + public void testIncorrectServerCreation() throws IOException { + RPC.getServer(Foo1.class, + new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index 02ca2afe42a..85e60dde9fa 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -39,7 +39,7 @@ import org.junit.Test; public class TestRPCCompatibility { private static final String ADDRESS = "0.0.0.0"; private static InetSocketAddress addr; - private static Server server; + private static RPC.Server server; private ProtocolProxy proxy; public static final Log LOG = @@ -52,10 +52,12 @@ public class TestRPCCompatibility { void ping() throws IOException; } - public interface TestProtocol1 extends TestProtocol0 { + public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 { String echo(String value) throws IOException; } + @ProtocolInfo(protocolName= + "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1") public interface TestProtocol2 extends TestProtocol1 { int echo(int value) throws IOException; } @@ -89,11 +91,23 @@ public class TestRPCCompatibility { public static class TestImpl1 extends TestImpl0 implements TestProtocol1 { @Override public String echo(String value) { return value; } + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + return TestProtocol1.versionID; + } } public static class TestImpl2 extends TestImpl1 implements TestProtocol2 { @Override public int echo(int value) { return value; } + + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + return TestProtocol2.versionID; + } + } @After @@ -109,8 +123,10 @@ public class TestRPCCompatibility { @Test // old client vs new server public void testVersion0ClientVersion1Server() throws Exception { // create a server with two handlers + TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, - new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -172,8 +188,10 @@ public class TestRPCCompatibility { @Test // Compatible new client & old server public void testVersion2ClientVersion1Server() throws Exception { // create a server with two handlers + TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, - new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -190,8 +208,10 @@ public class TestRPCCompatibility { @Test // equal version client and server public void testVersion2ClientVersion2Server() throws Exception { // create a server with two handlers + TestImpl2 impl = new TestImpl2(); server = RPC.getServer(TestProtocol2.class, - new TestImpl2(), ADDRESS, 0, 2, false, conf, null); + impl, ADDRESS, 0, 2, false, conf, null); + server.addProtocol(TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -250,14 +270,16 @@ public class TestRPCCompatibility { assertEquals(hash1, hash2); } + @ProtocolInfo(protocolName= + "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1") public interface TestProtocol4 extends TestProtocol2 { - public static final long versionID = 1L; + public static final long versionID = 4L; int echo(int value) throws IOException; } @Test public void testVersionMismatch() throws IOException { - server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2, + server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, false, conf, null); server.start(); addr = NetUtils.getConnectAddress(server); @@ -268,7 +290,8 @@ public class TestRPCCompatibility { proxy.echo(21); fail("The call must throw VersionMismatch exception"); } catch (IOException ex) { - Assert.assertTrue(ex.getMessage().contains("VersionMismatch")); + Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), + ex.getMessage().contains("VersionMismatch")); } } } \ No newline at end of file