svn merge -c 1177399 from trunk for HADOOP-7693.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1228685 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-07 17:59:30 +00:00
parent b0c952da25
commit 15a9e098cb
5 changed files with 67 additions and 20 deletions

View File

@ -10,6 +10,9 @@ Release 0.23-PB - Unreleased
HADOOP-7687 Make getProtocolSignature public (sanjay) HADOOP-7687 Make getProtocolSignature public (sanjay)
HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol
interface introduced in HADOOP-7524. (cutting)
Release 0.23.1 - Unreleased Release 0.23.1 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -29,6 +29,8 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.HashMap;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -54,7 +56,7 @@
public class AvroRpcEngine implements RpcEngine { public class AvroRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class); private static final Log LOG = LogFactory.getLog(RPC.class);
private static int VERSION = 0; private static int VERSION = 1;
// the implementation we tunnel through // the implementation we tunnel through
private static final RpcEngine ENGINE = new WritableRpcEngine(); private static final RpcEngine ENGINE = new WritableRpcEngine();
@ -62,9 +64,10 @@ public class AvroRpcEngine implements RpcEngine {
/** Tunnel an Avro RPC request and response through Hadoop's RPC. */ /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
private static interface TunnelProtocol extends VersionedProtocol { private static interface TunnelProtocol extends VersionedProtocol {
//WritableRpcEngine expects a versionID in every protocol. //WritableRpcEngine expects a versionID in every protocol.
public static final long versionID = 0L; public static final long versionID = VERSION;
/** All Avro methods and responses go through this. */ /** All Avro methods and responses go through this. */
BufferListWritable call(BufferListWritable request) throws IOException; BufferListWritable call(String protocol, BufferListWritable request)
throws IOException;
} }
/** A Writable that holds a List<ByteBuffer>, The Avro RPC Transceiver's /** A Writable that holds a List<ByteBuffer>, The Avro RPC Transceiver's
@ -103,23 +106,25 @@ public void write(DataOutput out) throws IOException {
private static class ClientTransceiver extends Transceiver { private static class ClientTransceiver extends Transceiver {
private TunnelProtocol tunnel; private TunnelProtocol tunnel;
private InetSocketAddress remote; private InetSocketAddress remote;
private String protocol;
public ClientTransceiver(InetSocketAddress addr, public ClientTransceiver(InetSocketAddress addr,
UserGroupInformation ticket, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, Configuration conf, SocketFactory factory,
int rpcTimeout) int rpcTimeout, String protocol)
throws IOException { throws IOException {
this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION, this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION,
addr, ticket, conf, factory, addr, ticket, conf, factory,
rpcTimeout).getProxy(); rpcTimeout).getProxy();
this.remote = addr; this.remote = addr;
this.protocol = protocol;
} }
public String getRemoteName() { return remote.toString(); } public String getRemoteName() { return remote.toString(); }
public List<ByteBuffer> transceive(List<ByteBuffer> request) public List<ByteBuffer> transceive(List<ByteBuffer> request)
throws IOException { throws IOException {
return tunnel.call(new BufferListWritable(request)).buffers; return tunnel.call(protocol, new BufferListWritable(request)).buffers;
} }
public List<ByteBuffer> readBuffers() throws IOException { public List<ByteBuffer> readBuffers() throws IOException {
@ -159,7 +164,8 @@ public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, SocketFactory factory,
int rpcTimeout) throws IOException { int rpcTimeout) throws IOException {
this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout); this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout,
protocol.getName());
this.requestor = createRequestor(protocol, tx); this.requestor = createRequestor(protocol, tx);
} }
@Override public Object invoke(Object proxy, Method method, Object[] args) @Override public Object invoke(Object proxy, Method method, Object[] args)
@ -182,9 +188,11 @@ protected Responder createResponder(Class<?> iface, Object impl) {
/** An Avro RPC Responder that can process requests passed via Hadoop RPC. */ /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
private class TunnelResponder implements TunnelProtocol { private class TunnelResponder implements TunnelProtocol {
private Responder responder; private Map<String, Responder> responders =
public TunnelResponder(Class<?> iface, Object impl) { new HashMap<String, Responder>();
responder = createResponder(iface, impl);
public void addProtocol(Class<?> iface, Object impl) {
responders.put(iface.getName(), createResponder(iface, impl));
} }
@Override @Override
@ -197,13 +205,18 @@ public long getProtocolVersion(String protocol, long version)
public ProtocolSignature getProtocolSignature( public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode) String protocol, long version, int clientMethodsHashCode)
throws IOException { throws IOException {
return new ProtocolSignature(VERSION, null); return ProtocolSignature.getProtocolSignature
(clientMethodsHashCode, VERSION, TunnelProtocol.class);
} }
public BufferListWritable call(final BufferListWritable request) public BufferListWritable call(String protocol, BufferListWritable request)
throws IOException { throws IOException {
Responder responder = responders.get(protocol);
if (responder == null)
throw new IOException("No responder for: "+protocol);
return new BufferListWritable(responder.respond(request.buffers)); return new BufferListWritable(responder.respond(request.buffers));
} }
} }
public Object[] call(Method method, Object[][] params, public Object[] call(Method method, Object[][] params,
@ -212,6 +225,32 @@ public Object[] call(Method method, Object[][] params,
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
private class Server extends WritableRpcEngine.Server {
private TunnelResponder responder = new TunnelResponder();
public Server(Class<?> iface, Object impl, String bindAddress,
int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException {
super((Class)null, new Object(), conf,
bindAddress, port, numHandlers, numReaders,
queueSizePerHandler, verbose, secretManager);
super.addProtocol(TunnelProtocol.class, responder);
responder.addProtocol(iface, impl);
}
@Override
public <PROTO, IMPL extends PROTO> Server
addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
throws IOException {
responder.addProtocol(protocolClass, protocolImpl);
return this;
}
}
/** Construct a server for a protocol implementation instance listening on a /** Construct a server for a protocol implementation instance listening on a
* port and address. */ * port and address. */
public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress, public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
@ -220,10 +259,9 @@ public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
Configuration conf, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager SecretManager<? extends TokenIdentifier> secretManager
) throws IOException { ) throws IOException {
return ENGINE.getServer(TunnelProtocol.class, return new Server
new TunnelResponder(iface, impl), (iface, impl, bindAddress, port, numHandlers, numReaders,
bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, conf, secretManager);
queueSizePerHandler, verbose, conf, secretManager);
} }
} }

View File

@ -605,7 +605,7 @@ protected Server(String bindAddress, int port,
* @param protocolImpl - the impl of the protocol that will be called * @param protocolImpl - the impl of the protocol that will be called
* @return the server (for convenience) * @return the server (for convenience)
*/ */
public <PROTO extends VersionedProtocol, IMPL extends PROTO> public <PROTO, IMPL extends PROTO>
Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
) throws IOException { ) throws IOException {
throw new IOException("addProtocol Not Implemented"); throw new IOException("addProtocol Not Implemented");

View File

@ -555,7 +555,7 @@ public Server(Class<?> protocolClass, Object protocolImpl,
@Override @Override
public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server public <PROTO, IMPL extends PROTO> Server
addProtocol( addProtocol(
Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException { Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
registerProtocolAndImpl(protocolClass, protocolImpl); registerProtocolAndImpl(protocolClass, protocolImpl);

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/** Unit tests for AvroRpc. */ /** Unit tests for AvroRpc. */
public class TestAvroRpc extends TestCase { public class TestAvroRpc extends TestCase {
@ -56,6 +57,9 @@ public class TestAvroRpc extends TestCase {
public TestAvroRpc(String name) { super(name); } public TestAvroRpc(String name) { super(name); }
public static interface EmptyProtocol {}
public static class EmptyImpl implements EmptyProtocol {}
public static class TestImpl implements AvroTestProtocol { public static class TestImpl implements AvroTestProtocol {
public void ping() {} public void ping() {}
@ -93,10 +97,12 @@ private void testReflect(boolean secure) throws Exception {
sm = new TestTokenSecretManager(); sm = new TestTokenSecretManager();
} }
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
RPC.setProtocolEngine(conf, EmptyProtocol.class, AvroRpcEngine.class);
RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class); RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
Server server = RPC.getServer(AvroTestProtocol.class, RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
new TestImpl(), ADDRESS, 0, 5, true, ADDRESS, 0, 5, true, conf, sm);
conf, sm); server.addProtocol(AvroTestProtocol.class, new TestImpl());
try { try {
server.start(); server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server); InetSocketAddress addr = NetUtils.getConnectAddress(server);