HADOOP-6904. Support method based RPC compatiblity. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1064919 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hairong Kuang 2011-01-28 22:45:58 +00:00
parent 077bc4af3c
commit b1e3037296
12 changed files with 865 additions and 63 deletions

View File

@ -4,6 +4,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
HADOOP-6904. Support method based RPC compatiblity. (hairong)
NEW FEATURES NEW FEATURES
HADOOP-7023. Add listCorruptFileBlocks to Filesysem. (Patrick Kling HADOOP-7023. Add listCorruptFileBlocks to Filesysem. (Patrick Kling

View File

@ -107,10 +107,9 @@ public class AvroRpcEngine implements RpcEngine {
Configuration conf, SocketFactory factory, Configuration conf, SocketFactory factory,
int rpcTimeout) int rpcTimeout)
throws IOException { throws IOException {
this.tunnel = this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION,
(TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION,
addr, ticket, conf, factory, addr, ticket, conf, factory,
rpcTimeout); rpcTimeout).getProxy();
this.remote = addr; this.remote = addr;
} }
@ -135,16 +134,20 @@ public class AvroRpcEngine implements RpcEngine {
} }
/** Construct a client-side proxy object that implements the named protocol, /** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */ * talking to a server at the named address.
public Object getProxy(Class<?> protocol, long clientVersion, * @param <T>*/
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, Configuration conf, SocketFactory factory,
int rpcTimeout) int rpcTimeout)
throws IOException { throws IOException {
return Proxy.newProxyInstance return new ProtocolProxy<T>(protocol,
(protocol.getClassLoader(), (T)Proxy.newProxyInstance(
new Class[] { protocol }, protocol.getClassLoader(),
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)),
null);
} }
/** Stop this proxy. */ /** Stop this proxy. */
@ -191,11 +194,19 @@ public class AvroRpcEngine implements RpcEngine {
responder = createResponder(iface, impl); responder = createResponder(iface, impl);
} }
@Override
public long getProtocolVersion(String protocol, long version) public long getProtocolVersion(String protocol, long version)
throws IOException { throws IOException {
return VERSION; return VERSION;
} }
@Override
public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode)
throws IOException {
return new ProtocolSignature(VERSION, null);
}
public BufferListWritable call(final BufferListWritable request) public BufferListWritable call(final BufferListWritable request)
throws IOException { throws IOException {
return new BufferListWritable(responder.respond(request.buffers)); return new BufferListWritable(responder.respond(request.buffers));

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashSet;
/**
* a class wraps around a server's proxy,
* containing a list of its supported methods.
*
* A list of methods with a value of null indicates that the client and server
* have the same protocol.
*/
public class ProtocolProxy<T> {
private Class<T> protocol;
private T proxy;
private HashSet<Integer> serverMethods = null;
/**
* Constructor
*
* @param protocol protocol class
* @param proxy its proxy
* @param serverMethods a list of hash codes of the methods that it supports
* @throws ClassNotFoundException
*/
public ProtocolProxy(Class<T> protocol, T proxy, int[] serverMethods) {
this.protocol = protocol;
this.proxy = proxy;
if (serverMethods != null) {
this.serverMethods = new HashSet<Integer>(serverMethods.length);
for (int method : serverMethods) {
this.serverMethods.add(Integer.valueOf(method));
}
}
}
/*
* Get the proxy
*/
public T getProxy() {
return proxy;
}
/**
* Check if a method is supported by the server or not
*
* @param methodName a method's name in String format
* @param parameterTypes a method's parameter types
* @return true if the method is supported by the server
*/
public boolean isMethodSupported(String methodName,
Class<?>... parameterTypes)
throws IOException {
if (serverMethods == null) { // client & server have the same protocol
return true;
}
Method method;
try {
method = protocol.getDeclaredMethod(methodName, parameterTypes);
} catch (SecurityException e) {
throw new IOException(e);
} catch (NoSuchMethodException e) {
throw new IOException(e);
}
return serverMethods.contains(
Integer.valueOf(ProtocolSignature.getFingerprint(method)));
}
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
public class ProtocolSignature implements Writable {
static { // register a ctor
WritableFactories.setFactory
(ProtocolSignature.class,
new WritableFactory() {
public Writable newInstance() { return new ProtocolSignature(); }
});
}
private long version;
private int[] methods = null; // an array of method hash codes
/**
* default constructor
*/
public ProtocolSignature() {
}
/**
* Constructor
*
* @param version server version
* @param methodHashcodes hash codes of the methods supported by server
*/
public ProtocolSignature(long version, int[] methodHashcodes) {
this.version = version;
this.methods = methodHashcodes;
}
public long getVersion() {
return version;
}
public int[] getMethods() {
return methods;
}
@Override
public void readFields(DataInput in) throws IOException {
version = in.readLong();
boolean hasMethods = in.readBoolean();
if (hasMethods) {
int numMethods = in.readInt();
methods = new int[numMethods];
for (int i=0; i<numMethods; i++) {
methods[i] = in.readInt();
}
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(version);
if (methods == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeInt(methods.length);
for (int method : methods) {
out.writeInt(method);
}
}
}
/**
* Calculate a method's hash code considering its method
* name, returning type, and its parameter types
*
* @param method a method
* @return its hash code
*/
static int getFingerprint(Method method) {
int hashcode = method.getName().hashCode();
hashcode = hashcode + 31*method.getReturnType().getName().hashCode();
for (Class<?> type : method.getParameterTypes()) {
hashcode = 31*hashcode ^ type.getName().hashCode();
}
return hashcode;
}
/**
* Convert an array of Method into an array of hash codes
*
* @param methods
* @return array of hash codes
*/
private static int[] getFingerprints(Method[] methods) {
if (methods == null) {
return null;
}
int[] hashCodes = new int[methods.length];
for (int i = 0; i<methods.length; i++) {
hashCodes[i] = getFingerprint(methods[i]);
}
return hashCodes;
}
/**
* Get the hash code of an array of methods
* Methods are sorted before hashcode is calculated.
* So the returned value is irrelevant of the method order in the array.
*
* @param methods an array of methods
* @return the hash code
*/
static int getFingerprint(Method[] methods) {
return getFingerprint(getFingerprints(methods));
}
/**
* Get the hash code of an array of hashcodes
* Hashcodes are sorted before hashcode is calculated.
* So the returned value is irrelevant of the hashcode order in the array.
*
* @param methods an array of methods
* @return the hash code
*/
static int getFingerprint(int[] hashcodes) {
Arrays.sort(hashcodes);
return Arrays.hashCode(hashcodes);
}
private static class ProtocolSigFingerprint {
private ProtocolSignature signature;
private int fingerprint;
ProtocolSigFingerprint(ProtocolSignature sig, int fingerprint) {
this.signature = sig;
this.fingerprint = fingerprint;
}
}
/**
* A cache that maps a protocol's name to its signature & finger print
*/
final private static HashMap<String, ProtocolSigFingerprint>
PROTOCOL_FINGERPRINT_CACHE =
new HashMap<String, ProtocolSigFingerprint>();
/**
* Return a protocol's signature and finger print from cache
*
* @param protocol a protocol class
* @param serverVersion protocol version
* @return its signature and finger print
*/
private static ProtocolSigFingerprint getSigFingerprint(
Class <? extends VersionedProtocol> protocol, long serverVersion) {
String protocolName = protocol.getName();
synchronized (PROTOCOL_FINGERPRINT_CACHE) {
ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
if (sig == null) {
int[] serverMethodHashcodes = getFingerprints(protocol.getMethods());
sig = new ProtocolSigFingerprint(
new ProtocolSignature(serverVersion, serverMethodHashcodes),
getFingerprint(serverMethodHashcodes));
PROTOCOL_FINGERPRINT_CACHE.put(protocolName, sig);
}
return sig;
}
}
/**
* Get a server protocol's signature
*
* @param clientMethodsHashCode client protocol methods hashcode
* @param serverVersion server protocol version
* @param protocol protocol
* @return the server's protocol signature
*/
static ProtocolSignature getProtocolSignature(
int clientMethodsHashCode,
long serverVersion,
Class<? extends VersionedProtocol> protocol) {
// try to get the finger print & signature from the cache
ProtocolSigFingerprint sig = getSigFingerprint(protocol, serverVersion);
// check if the client side protocol matches the one on the server side
if (clientMethodsHashCode == sig.fingerprint) {
return new ProtocolSignature(serverVersion, null); // null indicates a match
}
return sig.signature;
}
/**
* Get a server protocol's signature
*
* @param server server implementation
* @param protocol server protocol
* @param clientVersion client's version
* @param clientMethodsHash client's protocol's hash code
* @return the server protocol's signature
* @throws IOException if any error occurs
*/
@SuppressWarnings("unchecked")
public static ProtocolSignature getProtocolSigature(VersionedProtocol server,
String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)Class.forName(protocol);
} catch (Exception e) {
throw new IOException(e);
}
long serverVersion = server.getProtocolVersion(protocol, clientVersion);
return ProtocolSignature.getProtocolSignature(
clientMethodsHash, serverVersion, inter);
}
}

View File

@ -37,12 +37,9 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.*; import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
/** A simple RPC mechanism. /** A simple RPC mechanism.
@ -64,7 +61,7 @@ import org.apache.hadoop.util.ReflectionUtils;
* the protocol instance is transmitted. * the protocol instance is transmitted.
*/ */
public class RPC { public class RPC {
private static final Log LOG = LogFactory.getLog(RPC.class); static final Log LOG = LogFactory.getLog(RPC.class);
private RPC() {} // no public ctor private RPC() {} // no public ctor
@ -160,17 +157,47 @@ public class RPC {
} }
} }
public static Object waitForProxy( /**
Class<?> protocol, * Get a proxy connection to a remote server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> T waitForProxy(
Class<T> protocol,
long clientVersion, long clientVersion,
InetSocketAddress addr, InetSocketAddress addr,
Configuration conf Configuration conf
) throws IOException { ) throws IOException {
return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE); return waitForProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf) throws IOException {
return waitForProtocolProxy(
protocol, clientVersion, addr, conf, Long.MAX_VALUE);
} }
/** /**
* Get a proxy connection to a remote server * Get a proxy connection to a remote server
*
* @param protocol protocol class * @param protocol protocol class
* @param clientVersion client version * @param clientVersion client version
* @param addr remote address * @param addr remote address
@ -179,23 +206,68 @@ public class RPC {
* @return the proxy * @return the proxy
* @throws IOException if the far end through a RemoteException * @throws IOException if the far end through a RemoteException
*/ */
public static Object waitForProxy(Class<?> protocol, long clientVersion, public static <T> T waitForProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException { long connTimeout) throws IOException {
return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout); return waitForProtocolProxy(protocol, clientVersion, addr,
conf, connTimeout).getProxy();
} }
/**
* Get a proxy connection to a remote server /**
* @param protocol protocol class * Get a protocol proxy that contains a proxy connection to a remote server
* @param clientVersion client version * and a set of methods that are supported by the server
* @param addr remote address *
* @param conf configuration to use * @param protocol protocol class
* @param rpcTimeout timeout for each RPC * @param clientVersion client version
* @param timeout time in milliseconds before giving up * @param addr remote address
* @return the proxy * @param conf configuration to use
* @throws IOException if the far end through a RemoteException * @param connTimeout time in milliseconds before giving up
*/ * @return the protocol proxy
public static Object waitForProxy(Class<?> protocol, long clientVersion, * @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
}
/**
* Get a proxy connection to a remote server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param rpcTimeout timeout for each RPC
* @param timeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> T waitForProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
int rpcTimeout,
long timeout) throws IOException {
return waitForProtocolProxy(protocol, clientVersion, addr,
conf, rpcTimeout, timeout).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param rpcTimeout timeout for each RPC
* @param timeout time in milliseconds before giving up
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
int rpcTimeout, int rpcTimeout,
long timeout) throws IOException { long timeout) throws IOException {
@ -203,7 +275,7 @@ public class RPC {
IOException ioe; IOException ioe;
while (true) { while (true) {
try { try {
return getProxy(protocol, clientVersion, addr, return getProtocolProxy(protocol, clientVersion, addr,
UserGroupInformation.getCurrentUser(), conf, NetUtils UserGroupInformation.getCurrentUser(), conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout); .getDefaultSocketFactory(conf), rpcTimeout);
} catch(ConnectException se) { // namenode has not been started } catch(ConnectException se) { // namenode has not been started
@ -231,27 +303,76 @@ public class RPC {
} }
/** Construct a client-side proxy object that implements the named protocol, /** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */ * talking to a server at the named address.
public static Object getProxy(Class<?> protocol, long clientVersion, * @param <T>*/
public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
protocol, clientVersion, addr, conf, factory).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param conf configuration to use
* @param factory socket factory
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf, InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException { SocketFactory factory) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getProxy(protocol, clientVersion, addr, ugi, conf, factory); return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory);
} }
/** Construct a client-side proxy object that implements the named protocol, /** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */ * talking to a server at the named address.
public static Object getProxy(Class<?> protocol, long clientVersion, * @param <T>*/
public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, InetSocketAddress addr,
UserGroupInformation ticket, UserGroupInformation ticket,
Configuration conf, Configuration conf,
SocketFactory factory) throws IOException { SocketFactory factory) throws IOException {
return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0); return getProtocolProxy(
protocol, clientVersion, addr, ticket, conf, factory).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
* @param ticket user group information
* @param conf configuration to use
* @param factory socket factory
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
protocol, clientVersion, addr, ticket, conf, factory, 0);
} }
/** /**
* Construct a client-side proxy that implements the named protocol, * Construct a client-side proxy that implements the named protocol,
* talking to a server at the named address. * talking to a server at the named address.
* @param <T>
* *
* @param protocol protocol * @param protocol protocol
* @param clientVersion client's version * @param clientVersion client's version
@ -263,7 +384,33 @@ public class RPC {
* @return the proxy * @return the proxy
* @throws IOException if any error occurs * @throws IOException if any error occurs
*/ */
public static Object getProxy(Class<?> protocol, long clientVersion, public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, ticket,
conf, factory, rpcTimeout).getProxy();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
*
* @param protocol protocol
* @param clientVersion client's version
* @param addr server address
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, InetSocketAddress addr,
UserGroupInformation ticket, UserGroupInformation ticket,
Configuration conf, Configuration conf,
@ -276,21 +423,42 @@ public class RPC {
clientVersion, addr, ticket, conf, factory, rpcTimeout); clientVersion, addr, ticket, conf, factory, rpcTimeout);
} }
/**
* Construct a client-side proxy object with the default SocketFactory
* @param <T>
*
* @param protocol
* @param clientVersion
* @param addr
* @param conf
* @return a proxy instance
* @throws IOException
*/
public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf)
throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
/** /**
* Construct a client-side proxy object with the default SocketFactory * Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
* *
* @param protocol * @param protocol
* @param clientVersion * @param clientVersion
* @param addr * @param addr
* @param conf * @param conf
* @return a proxy instance * @return a protocol proxy
* @throws IOException * @throws IOException
*/ */
public static Object getProxy(Class<?> protocol, long clientVersion, public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf) InetSocketAddress addr, Configuration conf)
throws IOException { throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf)); .getDefaultSocketFactory(conf));
} }

View File

@ -34,8 +34,9 @@ import org.apache.hadoop.security.token.TokenIdentifier;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface RpcEngine { public interface RpcEngine {
/** Construct a client-side proxy object. */ /** Construct a client-side proxy object.
Object getProxy(Class<?> protocol, * @param <T>*/
<T> ProtocolProxy<T> getProxy(Class<T> protocol,
long clientVersion, InetSocketAddress addr, long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException; SocketFactory factory, int rpcTimeout) throws IOException;

View File

@ -32,7 +32,23 @@ public interface VersionedProtocol {
* @param protocol The classname of the protocol interface * @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks * @param clientVersion The version of the protocol that the client speaks
* @return the version that the server will speak * @return the version that the server will speak
* @throws IOException if any IO error occurs
*/ */
@Deprecated
public long getProtocolVersion(String protocol, public long getProtocolVersion(String protocol,
long clientVersion) throws IOException; long clientVersion) throws IOException;
/**
* Return protocol version corresponding to protocol interface.
* @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks
* @param clientMethodsHash the hashcode of client protocol methods
* @return the server protocol signature containing its version and
* a list of its supported methods
* @see ProtocolSignature#getProtocolSigature(VersionedProtocol, String,
* long, int) for a default implementation
*/
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion,
int clientMethodsHash) throws IOException;
} }

View File

@ -35,7 +35,6 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -221,25 +220,32 @@ public class WritableRpcEngine implements RpcEngine {
} }
/** Construct a client-side proxy object that implements the named protocol, /** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */ * talking to a server at the named address.
public Object getProxy(Class<?> protocol, long clientVersion, * @param <T>*/
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, Configuration conf, SocketFactory factory,
int rpcTimeout) int rpcTimeout)
throws IOException { throws IOException {
Object proxy = Proxy.newProxyInstance T proxy = (T)Proxy.newProxyInstance
(protocol.getClassLoader(), new Class[] { protocol }, (protocol.getClassLoader(), new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
int[] serverMethods = null;
if (proxy instanceof VersionedProtocol) { if (proxy instanceof VersionedProtocol) {
long serverVersion = ((VersionedProtocol)proxy) ProtocolSignature serverInfo = ((VersionedProtocol)proxy)
.getProtocolVersion(protocol.getName(), clientVersion); .getProtocolSignature(protocol.getName(), clientVersion,
ProtocolSignature.getFingerprint(protocol.getMethods()));
long serverVersion = serverInfo.getVersion();
if (serverVersion != clientVersion) { if (serverVersion != clientVersion) {
throw new RPC.VersionMismatch(protocol.getName(), clientVersion, throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
serverVersion); serverVersion);
} }
serverMethods = serverInfo.getMethods();
} }
return proxy; return new ProtocolProxy<T>(protocol, proxy, serverMethods);
} }
/** /**

View File

@ -132,6 +132,15 @@ public class MiniRPCBenchmark {
throw new IOException("Unknown protocol: " + protocol); throw new IOException("Unknown protocol: " + protocol);
} }
@Override // VersionedProtocol
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion,
int clientMethodsHashCode) throws IOException {
if (protocol.equals(MiniProtocol.class.getName()))
return new ProtocolSignature(versionID, null);
throw new IOException("Unknown protocol: " + protocol);
}
@Override // MiniProtocol @Override // MiniProtocol
public Token<TestDelegationTokenIdentifier> getDelegationToken(Text renewer) public Token<TestDelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException { throws IOException {

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.spi.NullContext; import org.apache.hadoop.metrics.spi.NullContext;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -81,6 +80,11 @@ public class TestRPC extends TestCase {
return TestProtocol.versionID; return TestProtocol.versionID;
} }
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
int hashcode) {
return new ProtocolSignature(TestProtocol.versionID, null);
}
public void ping() {} public void ping() {}
public synchronized void slowPing(boolean shouldSlow) { public synchronized void slowPing(boolean shouldSlow) {

View File

@ -0,0 +1,250 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Test;
/** Unit test for supporting method-name based compatible RPCs. */
public class TestRPCCompatibility {
private static final String ADDRESS = "0.0.0.0";
private static InetSocketAddress addr;
private static Server server;
private ProtocolProxy<?> proxy;
public static final Log LOG =
LogFactory.getLog(TestRPCCompatibility.class);
private static Configuration conf = new Configuration();
public interface TestProtocol0 extends VersionedProtocol {
public static final long versionID = 0L;
void ping() throws IOException;
}
public interface TestProtocol1 extends TestProtocol0 {
String echo(String value) throws IOException;
}
public interface TestProtocol2 extends TestProtocol1 {
int echo(int value) throws IOException;
}
public static class TestImpl0 implements TestProtocol0 {
@Override
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
return versionID;
}
@SuppressWarnings("unchecked")
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHashCode)
throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)getClass().getGenericInterfaces()[0];
} catch (Exception e) {
throw new IOException(e);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHashCode,
getProtocolVersion(protocol, clientVersion), inter);
}
@Override
public void ping() { return; }
}
public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
@Override
public String echo(String value) { return value; }
}
public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
@Override
public int echo(int value) { return value; }
}
@After
public void tearDown() throws IOException {
if (proxy != null) {
RPC.stopProxy(proxy.getProxy());
}
if (server != null) {
server.stop();
}
}
@Test // old client vs new server
public void testVersion0ClientVersion1Server() throws Exception {
// create a server with two handlers
server = RPC.getServer(TestProtocol1.class,
new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
proxy = RPC.getProtocolProxy(
TestProtocol0.class, TestProtocol0.versionID, addr, conf);
TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy();
proxy0.ping();
}
@Test // old client vs new server
public void testVersion1ClientVersion0Server() throws Exception {
// create a server with two handlers
server = RPC.getServer(TestProtocol0.class,
new TestImpl0(), ADDRESS, 0, 2, false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
proxy = RPC.getProtocolProxy(
TestProtocol1.class, TestProtocol1.versionID, addr, conf);
TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy();
proxy1.ping();
try {
proxy1.echo("hello");
fail("Echo should fail");
} catch(IOException e) {
}
}
private class Version2Client {
private TestProtocol2 proxy2;
private ProtocolProxy<TestProtocol2> serverInfo;
private Version2Client() throws IOException {
serverInfo = RPC.getProtocolProxy(
TestProtocol2.class, TestProtocol2.versionID, addr, conf);
proxy2 = serverInfo.getProxy();
}
public int echo(int value) throws IOException, NumberFormatException {
if (serverInfo.isMethodSupported("echo", int.class)) {
return -value; // use version 3 echo long
} else { // server is version 2
return Integer.parseInt(proxy2.echo(String.valueOf(value)));
}
}
public String echo(String value) throws IOException {
return proxy2.echo(value);
}
public void ping() throws IOException {
proxy2.ping();
}
}
@Test // Compatible new client & old server
public void testVersion2ClientVersion1Server() throws Exception {
// create a server with two handlers
server = RPC.getServer(TestProtocol1.class,
new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
Version2Client client = new Version2Client();
client.ping();
assertEquals("hello", client.echo("hello"));
// echo(int) is not supported by server, so returning 3
// This verifies that echo(int) and echo(String)'s hash codes are different
assertEquals(3, client.echo(3));
}
@Test // equal version client and server
public void testVersion2ClientVersion2Server() throws Exception {
// create a server with two handlers
server = RPC.getServer(TestProtocol2.class,
new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
Version2Client client = new Version2Client();
client.ping();
assertEquals("hello", client.echo("hello"));
// now that echo(int) is supported by the server, echo(int) should return -3
assertEquals(-3, client.echo(3));
}
public interface TestProtocol3 {
int echo(String value);
int echo(int value);
int echo_alias(int value);
int echo(int value1, int value2);
}
@Test
public void testHashCode() throws Exception {
// make sure that overriding methods have different hashcodes
Method strMethod = TestProtocol3.class.getMethod("echo", String.class);
int stringEchoHash = ProtocolSignature.getFingerprint(strMethod);
Method intMethod = TestProtocol3.class.getMethod("echo", int.class);
int intEchoHash = ProtocolSignature.getFingerprint(intMethod);
assertFalse(stringEchoHash == intEchoHash);
// make sure methods with the same signature
// from different declaring classes have the same hash code
int intEchoHash1 = ProtocolSignature.getFingerprint(
TestProtocol2.class.getMethod("echo", int.class));
assertEquals(intEchoHash, intEchoHash1);
// Methods with the same name and parameter types but different returning
// types have different hash codes
int stringEchoHash1 = ProtocolSignature.getFingerprint(
TestProtocol2.class.getMethod("echo", String.class));
assertFalse(stringEchoHash == stringEchoHash1);
// Make sure that methods with the same returning type and parameter types
// but different method names have different hash code
int intEchoHashAlias = ProtocolSignature.getFingerprint(
TestProtocol3.class.getMethod("echo_alias", int.class));
assertFalse(intEchoHash == intEchoHashAlias);
// Make sure that methods with the same returninig type and method name but
// larger number of parameter types have different hash code
int intEchoHash2 = ProtocolSignature.getFingerprint(
TestProtocol3.class.getMethod("echo", int.class, int.class));
assertFalse(intEchoHash == intEchoHash2);
// make sure that methods order does not matter for method array hash code
int hash1 = ProtocolSignature.getFingerprint(new Method[] {intMethod, strMethod});
int hash2 = ProtocolSignature.getFingerprint(new Method[] {strMethod, intMethod});
assertEquals(hash1, hash2);
}
}

View File

@ -30,6 +30,7 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.ipc.VersionedProtocol;
@ -134,9 +135,14 @@ public class TestDoAsEffectiveUser {
public long getProtocolVersion(String protocol, long clientVersion) public long getProtocolVersion(String protocol, long clientVersion)
throws IOException { throws IOException {
// TODO Auto-generated method stub
return TestProtocol.versionID; return TestProtocol.versionID;
} }
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(TestProtocol.versionID, null);
}
} }
@Test @Test
@ -161,7 +167,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException { public String run() throws IOException {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf); TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod(); String ret = proxy.aMethod();
return ret; return ret;
@ -203,7 +209,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException { public String run() throws IOException {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf); TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod(); String ret = proxy.aMethod();
return ret; return ret;
@ -250,7 +256,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException { public String run() throws IOException {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf); TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod(); String ret = proxy.aMethod();
return ret; return ret;
@ -289,7 +295,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException { public String run() throws IOException {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf); TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod(); String ret = proxy.aMethod();
return ret; return ret;
@ -368,7 +374,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() { .doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException { public String run() throws IOException {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf); TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod(); String ret = proxy.aMethod();
return ret; return ret;
@ -424,7 +430,7 @@ public class TestDoAsEffectiveUser {
@Override @Override
public String run() throws Exception { public String run() throws Exception {
try { try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf); TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod(); String ret = proxy.aMethod();
return ret; return ret;
@ -477,7 +483,7 @@ public class TestDoAsEffectiveUser {
@Override @Override
public String run() throws Exception { public String run() throws Exception {
try { try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, newConf); TestProtocol.versionID, addr, newConf);
String ret = proxy.aMethod(); String ret = proxy.aMethod();
return ret; return ret;