HADOOP-7524 and MapReduce-2887 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1164771 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2011-09-03 00:31:05 +00:00
parent 2566043c2f
commit b97a4d40c8
17 changed files with 691 additions and 72 deletions

View File

@ -5,6 +5,7 @@ Trunk (unreleased changes)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm) HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm)
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
Release 0.23.0 - Unreleased Release 0.23.0 - Unreleased

View File

@ -285,8 +285,8 @@ public class Client {
authMethod = AuthMethod.KERBEROS; authMethod = AuthMethod.KERBEROS;
} }
header = new ConnectionHeader(protocol == null ? null : protocol header =
.getName(), ticket, authMethod); new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol " LOG.debug("Use " + authMethod + " authentication for protocol "

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* The protocol name that is used when a client and server connect.
* By default the class name of the protocol interface is the protocol name.
*
* Why override the default name (i.e. the class name)?
* One use case overriding the default name (i.e. the class name) is when
* there are multiple implementations of the same protocol, each with say a
* different version/serialization.
* In Hadoop this is used to allow multiple server and client adapters
* for different versions of the same protocol service.
*/
@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {
String protocolName(); // the name of the protocol (i.e. rpc service)
}

View File

@ -63,6 +63,20 @@ import org.apache.hadoop.util.ReflectionUtils;
public class RPC { public class RPC {
static final Log LOG = LogFactory.getLog(RPC.class); static final Log LOG = LogFactory.getLog(RPC.class);
/**
* Get the protocol name.
* If the protocol class has a ProtocolAnnotation, then get the protocol
* name from the annotation; otherwise the class name is the protocol name.
*/
static public String getProtocolName(Class<?> protocol) {
if (protocol == null) {
return null;
}
ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
return (anno == null) ? protocol.getName() : anno.protocolName();
}
private RPC() {} // no public ctor private RPC() {} // no public ctor
// cache of RpcEngines by protocol // cache of RpcEngines by protocol
@ -553,8 +567,10 @@ public class RPC {
} }
/** Construct a server for a protocol implementation instance. */ /** Construct a server for a protocol implementation instance. */
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port, public static <PROTO extends VersionedProtocol, IMPL extends PROTO>
Server getServer(Class<PROTO> protocol,
IMPL instance, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) SecretManager<? extends TokenIdentifier> secretManager)
@ -576,6 +592,18 @@ public class RPC {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager); conf, serverName, secretManager);
} }
/**
* Add a protocol to the existing server.
* @param protocolClass - the protocol class
* @param protocolImpl - the impl of the protocol that will be called
* @return the server (for convenience)
*/
public <PROTO extends VersionedProtocol, IMPL extends PROTO>
Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
) throws IOException {
throw new IOException("addProtocol Not Implemented");
}
} }
} }

View File

@ -900,7 +900,7 @@ public abstract class Server {
private InetAddress addr; private InetAddress addr;
ConnectionHeader header = new ConnectionHeader(); ConnectionHeader header = new ConnectionHeader();
Class<?> protocol; String protocolName;
boolean useSasl; boolean useSasl;
SaslServer saslServer; SaslServer saslServer;
private AuthMethod authMethod; private AuthMethod authMethod;
@ -1287,15 +1287,8 @@ public abstract class Server {
DataInputStream in = DataInputStream in =
new DataInputStream(new ByteArrayInputStream(buf)); new DataInputStream(new ByteArrayInputStream(buf));
header.readFields(in); header.readFields(in);
try { protocolName = header.getProtocol();
String protocolClassName = header.getProtocol();
if (protocolClassName != null) {
protocol = getProtocolClass(header.getProtocol(), conf);
rpcDetailedMetrics.init(protocol);
}
} catch (ClassNotFoundException cnfe) {
throw new IOException("Unknown protocol: " + header.getProtocol());
}
UserGroupInformation protocolUser = header.getUgi(); UserGroupInformation protocolUser = header.getUgi();
if (!useSasl) { if (!useSasl) {
@ -1484,7 +1477,7 @@ public abstract class Server {
// Make the call as the user via Subject.doAs, thus associating // Make the call as the user via Subject.doAs, thus associating
// the call with the Subject // the call with the Subject
if (call.connection.user == null) { if (call.connection.user == null) {
value = call(call.connection.protocol, call.param, value = call(call.connection.protocolName, call.param,
call.timestamp); call.timestamp);
} else { } else {
value = value =
@ -1493,7 +1486,7 @@ public abstract class Server {
@Override @Override
public Writable run() throws Exception { public Writable run() throws Exception {
// make the call // make the call
return call(call.connection.protocol, return call(call.connection.protocolName,
call.param, call.timestamp); call.param, call.timestamp);
} }
@ -1753,7 +1746,7 @@ public abstract class Server {
/** /**
* Called for each call. * Called for each call.
* @deprecated Use {@link #call(Class, Writable, long)} instead * @deprecated Use {@link #call(String, Writable, long)} instead
*/ */
@Deprecated @Deprecated
public Writable call(Writable param, long receiveTime) throws IOException { public Writable call(Writable param, long receiveTime) throws IOException {
@ -1761,7 +1754,7 @@ public abstract class Server {
} }
/** Called for each call. */ /** Called for each call. */
public abstract Writable call(Class<?> protocol, public abstract Writable call(String protocol,
Writable param, long receiveTime) Writable param, long receiveTime)
throws IOException; throws IOException;

View File

@ -34,7 +34,6 @@ public interface VersionedProtocol {
* @return the version that the server will speak * @return the version that the server will speak
* @throws IOException if any IO error occurs * @throws IOException if any IO error occurs
*/ */
@Deprecated
public long getProtocolVersion(String protocol, public long getProtocolVersion(String protocol,
long clientVersion) throws IOException; long clientVersion) throws IOException;

View File

@ -27,6 +27,9 @@ import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.io.*; import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
@ -35,6 +38,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*; import org.apache.commons.logging.*;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -47,9 +51,45 @@ import org.apache.hadoop.conf.*;
public class WritableRpcEngine implements RpcEngine { public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class); private static final Log LOG = LogFactory.getLog(RPC.class);
/**
* Get all superInterfaces that extend VersionedProtocol
* @param childInterfaces
* @return the super interfaces that extend VersionedProtocol
*/
private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
for (Class<?> childInterface : childInterfaces) {
if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
allInterfaces.add(childInterface);
allInterfaces.addAll(
Arrays.asList(
getSuperInterfaces(childInterface.getInterfaces())));
} else {
LOG.warn("Interface " + childInterface +
" ignored because it does not extend VersionedProtocol");
}
}
return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
}
/**
* Get all interfaces that the given protocol implements or extends
* which are assignable from VersionedProtocol.
*/
private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
Class<?>[] interfaces = protocol.getInterfaces();
return getSuperInterfaces(interfaces);
}
//writableRpcVersion should be updated if there is a change //writableRpcVersion should be updated if there is a change
//in format of the rpc messages. //in format of the rpc messages.
public static long writableRpcVersion = 1L;
// 2L - added declared class to Invocation
public static final long writableRpcVersion = 2L;
/** A method invocation, including the method name and its parameters.*/ /** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable { private static class Invocation implements Writable, Configurable {
@ -59,11 +99,13 @@ public class WritableRpcEngine implements RpcEngine {
private Configuration conf; private Configuration conf;
private long clientVersion; private long clientVersion;
private int clientMethodsHash; private int clientMethodsHash;
private String declaringClassProtocolName;
//This could be different from static writableRpcVersion when received //This could be different from static writableRpcVersion when received
//at server, if client is using a different version. //at server, if client is using a different version.
private long rpcVersion; private long rpcVersion;
@SuppressWarnings("unused") // called when deserializing an invocation
public Invocation() {} public Invocation() {}
public Invocation(Method method, Object[] parameters) { public Invocation(Method method, Object[] parameters) {
@ -88,6 +130,8 @@ public class WritableRpcEngine implements RpcEngine {
this.clientMethodsHash = ProtocolSignature.getFingerprint(method this.clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods()); .getDeclaringClass().getMethods());
} }
this.declaringClassProtocolName =
RPC.getProtocolName(method.getDeclaringClass());
} }
/** The name of the method invoked. */ /** The name of the method invoked. */
@ -103,6 +147,7 @@ public class WritableRpcEngine implements RpcEngine {
return clientVersion; return clientVersion;
} }
@SuppressWarnings("unused")
private int getClientMethodsHash() { private int getClientMethodsHash() {
return clientMethodsHash; return clientMethodsHash;
} }
@ -115,8 +160,10 @@ public class WritableRpcEngine implements RpcEngine {
return rpcVersion; return rpcVersion;
} }
@SuppressWarnings("deprecation")
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
rpcVersion = in.readLong(); rpcVersion = in.readLong();
declaringClassProtocolName = UTF8.readString(in);
methodName = UTF8.readString(in); methodName = UTF8.readString(in);
clientVersion = in.readLong(); clientVersion = in.readLong();
clientMethodsHash = in.readInt(); clientMethodsHash = in.readInt();
@ -124,13 +171,16 @@ public class WritableRpcEngine implements RpcEngine {
parameterClasses = new Class[parameters.length]; parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable(); ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) { for (int i = 0; i < parameters.length; i++) {
parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf); parameters[i] =
ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass(); parameterClasses[i] = objectWritable.getDeclaredClass();
} }
} }
@SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion); out.writeLong(rpcVersion);
UTF8.writeString(out, declaringClassProtocolName);
UTF8.writeString(out, methodName); UTF8.writeString(out, methodName);
out.writeLong(clientVersion); out.writeLong(clientVersion);
out.writeInt(clientMethodsHash); out.writeInt(clientMethodsHash);
@ -273,30 +323,161 @@ public class WritableRpcEngine implements RpcEngine {
/** Construct a server for a protocol implementation instance listening on a /** Construct a server for a protocol implementation instance listening on a
* port and address. */ * port and address. */
public Server getServer(Class<?> protocol, public RPC.Server getServer(Class<?> protocolClass,
Object instance, String bindAddress, int port, Object protocolImpl, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager) SecretManager<? extends TokenIdentifier> secretManager)
throws IOException { throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numReaders, queueSizePerHandler, verbose, secretManager); numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
} }
/** An RPC Server. */ /** An RPC Server. */
public static class Server extends RPC.Server { public static class Server extends RPC.Server {
private Object instance;
private boolean verbose; private boolean verbose;
/**
* The key in Map
*/
static class ProtoNameVer {
final String protocol;
final long version;
ProtoNameVer(String protocol, long ver) {
this.protocol = protocol;
this.version = ver;
}
@Override
public boolean equals(Object o) {
if (o == null)
return false;
if (this == o)
return true;
if (! (o instanceof ProtoNameVer))
return false;
ProtoNameVer pv = (ProtoNameVer) o;
return ((pv.protocol.equals(this.protocol)) &&
(pv.version == this.version));
}
@Override
public int hashCode() {
return protocol.hashCode() * 37 + (int) version;
}
}
/**
* The value in map
*/
static class ProtoClassProtoImpl {
final Class<?> protocolClass;
final Object protocolImpl;
ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
this.protocolClass = protocolClass;
this.protocolImpl = protocolImpl;
}
}
private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap =
new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
// Register protocol and its impl for rpc calls
private void registerProtocolAndImpl(Class<?> protocolClass,
Object protocolImpl) throws IOException {
String protocolName = RPC.getProtocolName(protocolClass);
VersionedProtocol vp = (VersionedProtocol) protocolImpl;
long version;
try {
version = vp.getProtocolVersion(protocolName, 0);
} catch (Exception ex) {
LOG.warn("Protocol " + protocolClass +
" NOT registered as getProtocolVersion throws exception ");
return;
}
protocolImplMap.put(new ProtoNameVer(protocolName, version),
new ProtoClassProtoImpl(protocolClass, protocolImpl));
LOG.info("ProtocolImpl=" + protocolImpl.getClass().getName() +
" protocolClass=" + protocolClass.getName() + " version=" + version);
}
private static class VerProtocolImpl {
final long version;
final ProtoClassProtoImpl protocolTarget;
VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
this.version = ver;
this.protocolTarget = protocolTarget;
}
}
@SuppressWarnings("unused") // will be useful later.
private VerProtocolImpl[] getSupportedProtocolVersions(
String protocolName) {
VerProtocolImpl[] resultk = new VerProtocolImpl[protocolImplMap.size()];
int i = 0;
for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
protocolImplMap.entrySet()) {
if (pv.getKey().protocol.equals(protocolName)) {
resultk[i++] =
new VerProtocolImpl(pv.getKey().version, pv.getValue());
}
}
if (i == 0) {
return null;
}
VerProtocolImpl[] result = new VerProtocolImpl[i];
System.arraycopy(resultk, 0, result, 0, i);
return result;
}
private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {
Long highestVersion = 0L;
ProtoClassProtoImpl highest = null;
for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
.entrySet()) {
if (pv.getKey().protocol.equals(protocolName)) {
if ((highest == null) || (pv.getKey().version > highestVersion)) {
highest = pv.getValue();
highestVersion = pv.getKey().version;
}
}
}
if (highest == null) {
return null;
}
return new VerProtocolImpl(highestVersion, highest);
}
/** Construct an RPC server. /** Construct an RPC server.
* @param instance the instance whose methods will be called * @param instance the instance whose methods will be called
* @param conf the configuration to use * @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection * @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on * @param port the port to listen for connections on
*
* @deprecated Use #Server(Class, Object, Configuration, String, int)
*
*/ */
public Server(Object instance, Configuration conf, String bindAddress, int port) @Deprecated
public Server(Object instance, Configuration conf, String bindAddress,
int port)
throws IOException { throws IOException {
this(instance, conf, bindAddress, port, 1, -1, -1, false, null); this(null, instance, conf, bindAddress, port);
}
/** Construct an RPC server.
* @param protocol class
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port)
throws IOException {
this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
false, null);
} }
private static String classNameBase(String className) { private static String classNameBase(String className) {
@ -307,35 +488,103 @@ public class WritableRpcEngine implements RpcEngine {
return names[names.length-1]; return names[names.length-1];
} }
/** Construct an RPC server. /** Construct an RPC server.
* @param instance the instance whose methods will be called * @param protocolImpl the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*
* @deprecated use Server#Server(Class, Object,
* Configuration, String, int, int, int, int, boolean, SecretManager)
*/
@Deprecated
public Server(Object protocolImpl, Configuration conf, String bindAddress,
int port, int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this(null, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose,
secretManager);
}
/** Construct an RPC server.
* @param protocolClass - the protocol being registered
* can be null for compatibility with old usage (see below for details)
* @param protocolImpl the protocol impl that will be called
* @param conf the configuration to use * @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection * @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on * @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run * @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged * @param verbose whether each call should be logged
*/ */
public Server(Object instance, Configuration conf, String bindAddress, int port, public Server(Class<?> protocolClass, Object protocolImpl,
int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, String bindAddress, int port,
SecretManager<? extends TokenIdentifier> secretManager) int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException { throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, numReaders, super(bindAddress, port, Invocation.class, numHandlers, numReaders,
queueSizePerHandler, conf, queueSizePerHandler, conf,
classNameBase(instance.getClass().getName()), secretManager); classNameBase(protocolImpl.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose; this.verbose = verbose;
Class<?>[] protocols;
if (protocolClass == null) { // derive protocol from impl
/*
* In order to remain compatible with the old usage where a single
* target protocolImpl is suppled for all protocol interfaces, and
* the protocolImpl is derived from the protocolClass(es)
* we register all interfaces extended by the protocolImpl
*/
protocols = getProtocolInterfaces(protocolImpl.getClass());
} else {
if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
throw new IOException("protocolClass "+ protocolClass +
" is not implemented by protocolImpl which is of class " +
protocolImpl.getClass());
}
// register protocol class and its super interfaces
registerProtocolAndImpl(protocolClass, protocolImpl);
protocols = getProtocolInterfaces(protocolClass);
}
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
registerProtocolAndImpl(p, protocolImpl);
}
} }
public Writable call(Class<?> protocol, Writable param, long receivedTime) }
@Override
public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server
addProtocol(
Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
registerProtocolAndImpl(protocolClass, protocolImpl);
return this;
}
/**
* Process a client call
* @param protocolName - the protocol name (the class of the client proxy
* used to make calls to the rpc server.
* @param param parameters
* @param receivedTime time at which the call receoved (for metrics)
* @return the call's return
* @throws IOException
*/
public Writable call(String protocolName, Writable param, long receivedTime)
throws IOException { throws IOException {
try { try {
Invocation call = (Invocation)param; Invocation call = (Invocation)param;
if (verbose) log("Call: " + call); if (verbose) log("Call: " + call);
Method method = protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
// Verify rpc version // Verify rpc version
if (call.getRpcVersion() != writableRpcVersion) { if (call.getRpcVersion() != writableRpcVersion) {
// Client is using a different version of WritableRpc // Client is using a different version of WritableRpc
@ -345,24 +594,50 @@ public class WritableRpcEngine implements RpcEngine {
+ writableRpcVersion); + writableRpcVersion);
} }
//Verify protocol version.
//Bypass the version check for VersionedProtocol
if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
long clientVersion = call.getProtocolVersion(); long clientVersion = call.getProtocolVersion();
ProtocolSignature serverInfo = ((VersionedProtocol) instance) final String protoName;
.getProtocolSignature(protocol.getCanonicalName(), call ProtoClassProtoImpl protocolImpl;
.getProtocolVersion(), call.getClientMethodsHash()); if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
long serverVersion = serverInfo.getVersion(); // VersionProtocol methods are often used by client to figure out
if (serverVersion != clientVersion) { // which version of protocol to use.
LOG.warn("Version mismatch: client version=" + clientVersion //
+ ", server version=" + serverVersion); // Versioned protocol methods should go the protocolName protocol
throw new RPC.VersionMismatch(protocol.getName(), clientVersion, // rather than the declaring class of the method since the
serverVersion); // the declaring class is VersionedProtocol which is not
// registered directly.
// Send the call to the highest protocol version
protocolImpl =
getHighestSupportedProtocol(protocolName).protocolTarget;
} else {
protoName = call.declaringClassProtocolName;
// Find the right impl for the protocol based on client version.
ProtoNameVer pv =
new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
protocolImpl = protocolImplMap.get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
getHighestSupportedProtocol(protoName);
if (highest == null) {
throw new IOException("Unknown protocol: " + protoName);
} else { // protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
} }
} }
// Invoke the protocol method
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters()); Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime); int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receivedTime); int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -97,7 +97,7 @@ public class TestIPC {
} }
@Override @Override
public Writable call(Class<?> protocol, Writable param, long receiveTime) public Writable call(String protocol, Writable param, long receiveTime)
throws IOException { throws IOException {
if (sleep) { if (sleep) {
// sleep a bit // sleep a bit

View File

@ -72,7 +72,7 @@ public class TestIPCServerResponder extends TestCase {
} }
@Override @Override
public Writable call(Class<?> protocol, Writable param, long receiveTime) public Writable call(String protocol, Writable param, long receiveTime)
throws IOException { throws IOException {
if (sleep) { if (sleep) {
try { try {

View File

@ -0,0 +1,255 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
public class TestMultipleProtocolServer {
private static final String ADDRESS = "0.0.0.0";
private static InetSocketAddress addr;
private static RPC.Server server;
private static Configuration conf = new Configuration();
@ProtocolInfo(protocolName="Foo")
interface Foo0 extends VersionedProtocol {
public static final long versionID = 0L;
String ping() throws IOException;
}
@ProtocolInfo(protocolName="Foo")
interface Foo1 extends VersionedProtocol {
public static final long versionID = 1L;
String ping() throws IOException;
String ping2() throws IOException;
}
@ProtocolInfo(protocolName="Foo")
interface FooUnimplemented extends VersionedProtocol {
public static final long versionID = 2L;
String ping() throws IOException;
}
interface Mixin extends VersionedProtocol{
public static final long versionID = 0L;
void hello() throws IOException;
}
interface Bar extends Mixin, VersionedProtocol {
public static final long versionID = 0L;
int echo(int i) throws IOException;
}
class Foo0Impl implements Foo0 {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Foo0.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public String ping() {
return "Foo0";
}
}
class Foo1Impl implements Foo1 {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Foo1.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public String ping() {
return "Foo1";
}
@Override
public String ping2() {
return "Foo1";
}
}
class BarImpl implements Bar {
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return Bar.versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().
getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public int echo(int i) {
return i;
}
@Override
public void hello() {
}
}
@Before
public void setUp() throws Exception {
// create a server with two handlers
server = RPC.getServer(Foo0.class,
new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
server.addProtocol(Foo1.class, new Foo1Impl());
server.addProtocol(Bar.class, new BarImpl());
server.addProtocol(Mixin.class, new BarImpl());
server.start();
addr = NetUtils.getConnectAddress(server);
}
@After
public void tearDown() throws Exception {
server.stop();
}
@Test
public void test1() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
Foo0 foo0 = (Foo0)proxy.getProxy();
Assert.assertEquals("Foo0", foo0.ping());
proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
Foo1 foo1 = (Foo1)proxy.getProxy();
Assert.assertEquals("Foo1", foo1.ping());
Assert.assertEquals("Foo1", foo1.ping());
proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
Bar bar = (Bar)proxy.getProxy();
Assert.assertEquals(99, bar.echo(99));
// Now test Mixin class method
Mixin mixin = bar;
mixin.hello();
}
// Server does not implement the FooUnimplemented version of protocol Foo.
// See that calls to it fail.
@Test(expected=IOException.class)
public void testNonExistingProtocol() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(FooUnimplemented.class,
FooUnimplemented.versionID, addr, conf);
FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
foo.ping();
}
/**
* getProtocolVersion of an unimplemented version should return highest version
* Similarly getProtocolSignature should work.
* @throws IOException
*/
@Test
public void testNonExistingProtocol2() throws IOException {
ProtocolProxy<?> proxy;
proxy = RPC.getProtocolProxy(FooUnimplemented.class,
FooUnimplemented.versionID, addr, conf);
FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
Assert.assertEquals(Foo1.versionID,
foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class),
FooUnimplemented.versionID));
foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class),
FooUnimplemented.versionID, 0);
}
@Test(expected=IOException.class)
public void testIncorrectServerCreation() throws IOException {
RPC.getServer(Foo1.class,
new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
}
}

View File

@ -39,7 +39,7 @@ import org.junit.Test;
public class TestRPCCompatibility { public class TestRPCCompatibility {
private static final String ADDRESS = "0.0.0.0"; private static final String ADDRESS = "0.0.0.0";
private static InetSocketAddress addr; private static InetSocketAddress addr;
private static Server server; private static RPC.Server server;
private ProtocolProxy<?> proxy; private ProtocolProxy<?> proxy;
public static final Log LOG = public static final Log LOG =
@ -52,10 +52,12 @@ public class TestRPCCompatibility {
void ping() throws IOException; void ping() throws IOException;
} }
public interface TestProtocol1 extends TestProtocol0 { public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 {
String echo(String value) throws IOException; String echo(String value) throws IOException;
} }
@ProtocolInfo(protocolName=
"org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
public interface TestProtocol2 extends TestProtocol1 { public interface TestProtocol2 extends TestProtocol1 {
int echo(int value) throws IOException; int echo(int value) throws IOException;
} }
@ -89,11 +91,23 @@ public class TestRPCCompatibility {
public static class TestImpl1 extends TestImpl0 implements TestProtocol1 { public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
@Override @Override
public String echo(String value) { return value; } public String echo(String value) { return value; }
@Override
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
return TestProtocol1.versionID;
}
} }
public static class TestImpl2 extends TestImpl1 implements TestProtocol2 { public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
@Override @Override
public int echo(int value) { return value; } public int echo(int value) { return value; }
@Override
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
return TestProtocol2.versionID;
}
} }
@After @After
@ -109,8 +123,10 @@ public class TestRPCCompatibility {
@Test // old client vs new server @Test // old client vs new server
public void testVersion0ClientVersion1Server() throws Exception { public void testVersion0ClientVersion1Server() throws Exception {
// create a server with two handlers // create a server with two handlers
TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, server = RPC.getServer(TestProtocol1.class,
new TestImpl1(), ADDRESS, 0, 2, false, conf, null); impl, ADDRESS, 0, 2, false, conf, null);
server.addProtocol(TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -172,8 +188,10 @@ public class TestRPCCompatibility {
@Test // Compatible new client & old server @Test // Compatible new client & old server
public void testVersion2ClientVersion1Server() throws Exception { public void testVersion2ClientVersion1Server() throws Exception {
// create a server with two handlers // create a server with two handlers
TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, server = RPC.getServer(TestProtocol1.class,
new TestImpl1(), ADDRESS, 0, 2, false, conf, null); impl, ADDRESS, 0, 2, false, conf, null);
server.addProtocol(TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -190,8 +208,10 @@ public class TestRPCCompatibility {
@Test // equal version client and server @Test // equal version client and server
public void testVersion2ClientVersion2Server() throws Exception { public void testVersion2ClientVersion2Server() throws Exception {
// create a server with two handlers // create a server with two handlers
TestImpl2 impl = new TestImpl2();
server = RPC.getServer(TestProtocol2.class, server = RPC.getServer(TestProtocol2.class,
new TestImpl2(), ADDRESS, 0, 2, false, conf, null); impl, ADDRESS, 0, 2, false, conf, null);
server.addProtocol(TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -250,14 +270,16 @@ public class TestRPCCompatibility {
assertEquals(hash1, hash2); assertEquals(hash1, hash2);
} }
@ProtocolInfo(protocolName=
"org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
public interface TestProtocol4 extends TestProtocol2 { public interface TestProtocol4 extends TestProtocol2 {
public static final long versionID = 1L; public static final long versionID = 4L;
int echo(int value) throws IOException; int echo(int value) throws IOException;
} }
@Test @Test
public void testVersionMismatch() throws IOException { public void testVersionMismatch() throws IOException {
server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2, server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
false, conf, null); false, conf, null);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -268,7 +290,8 @@ public class TestRPCCompatibility {
proxy.echo(21); proxy.echo(21);
fail("The call must throw VersionMismatch exception"); fail("The call must throw VersionMismatch exception");
} catch (IOException ex) { } catch (IOException ex) {
Assert.assertTrue(ex.getMessage().contains("VersionMismatch")); Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(),
ex.getMessage().contains("VersionMismatch"));
} }
} }
} }

View File

@ -5,6 +5,10 @@ Trunk (unreleased changes)
HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel
via hairong) via hairong)
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
BUG FIXES BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd) HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)

View File

@ -98,7 +98,7 @@ public class TestDFSClientRetries extends TestCase {
} }
@Override @Override
public Writable call(Class<?> protocol, Writable param, long receiveTime) public Writable call(String protocol, Writable param, long receiveTime)
throws IOException { throws IOException {
if (sleep) { if (sleep) {
// sleep a bit // sleep a bit

View File

@ -87,7 +87,7 @@ public class TestInterDatanodeProtocol {
} }
@Override @Override
public Writable call(Class<?> protocol, Writable param, long receiveTime) public Writable call(String protocol, Writable param, long receiveTime)
throws IOException { throws IOException {
if (sleep) { if (sleep) {
// sleep a bit // sleep a bit

View File

@ -1,6 +1,9 @@
Hadoop MapReduce Change Log Hadoop MapReduce Change Log
Trunk (unreleased changes) Trunk (unreleased changes)
IMPROVEMENTS
MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
Release 0.23.0 - Unreleased Release 0.23.0 - Unreleased

View File

@ -317,15 +317,15 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
} }
@Override @Override
public Writable call(Class<?> protocol, Writable writableRequest, public Writable call(String protocol, Writable writableRequest,
long receiveTime) throws IOException { long receiveTime) throws IOException {
ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable) writableRequest; ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable) writableRequest;
ProtoSpecificRpcRequest rpcRequest = request.message; ProtoSpecificRpcRequest rpcRequest = request.message;
String methodName = rpcRequest.getMethodName(); String methodName = rpcRequest.getMethodName();
System.out.println("Call: protocol=" + protocol.getCanonicalName() + ", method=" System.out.println("Call: protocol=" + protocol + ", method="
+ methodName); + methodName);
if (verbose) if (verbose)
log("Call: protocol=" + protocol.getCanonicalName() + ", method=" log("Call: protocol=" + protocol + ", method="
+ methodName); + methodName);
MethodDescriptor methodDescriptor = service.getDescriptorForType() MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName); .findMethodByName(methodName);

View File

@ -1483,7 +1483,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf); taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
int handlerCount = conf.getInt(JT_IPC_HANDLER_COUNT, 10); int handlerCount = conf.getInt(JT_IPC_HANDLER_COUNT, 10);
this.interTrackerServer = RPC.getServer(ClientProtocol.class, this.interTrackerServer = RPC.getServer(JobTracker.class, // All protocols in JobTracker
this, this,
addr.getHostName(), addr.getHostName(),
addr.getPort(), handlerCount, addr.getPort(), handlerCount,