diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a1181936786..54df8d8fdc4 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -10,6 +10,9 @@ Release 0.23-PB - Unreleased HADOOP-7687 Make getProtocolSignature public (sanjay) + HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol + interface introduced in HADOOP-7524. (cutting) + Release 0.23.1 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java index 1b73351bf16..12aa04ff855 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java @@ -29,6 +29,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import javax.net.SocketFactory; @@ -54,7 +56,7 @@ public class AvroRpcEngine implements RpcEngine { private static final Log LOG = LogFactory.getLog(RPC.class); - private static int VERSION = 0; + private static int VERSION = 1; // the implementation we tunnel through private static final RpcEngine ENGINE = new WritableRpcEngine(); @@ -62,9 +64,10 @@ public class AvroRpcEngine implements RpcEngine { /** Tunnel an Avro RPC request and response through Hadoop's RPC. */ private static interface TunnelProtocol extends VersionedProtocol { //WritableRpcEngine expects a versionID in every protocol. - public static final long versionID = 0L; + public static final long versionID = VERSION; /** All Avro methods and responses go through this. */ - BufferListWritable call(BufferListWritable request) throws IOException; + BufferListWritable call(String protocol, BufferListWritable request) + throws IOException; } /** A Writable that holds a List, The Avro RPC Transceiver's @@ -103,23 +106,25 @@ public void write(DataOutput out) throws IOException { private static class ClientTransceiver extends Transceiver { private TunnelProtocol tunnel; private InetSocketAddress remote; + private String protocol; public ClientTransceiver(InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) + int rpcTimeout, String protocol) throws IOException { this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION, addr, ticket, conf, factory, rpcTimeout).getProxy(); this.remote = addr; + this.protocol = protocol; } public String getRemoteName() { return remote.toString(); } public List transceive(List request) throws IOException { - return tunnel.call(new BufferListWritable(request)).buffers; + return tunnel.call(protocol, new BufferListWritable(request)).buffers; } public List readBuffers() throws IOException { @@ -159,7 +164,8 @@ public Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout); + this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout, + protocol.getName()); this.requestor = createRequestor(protocol, tx); } @Override public Object invoke(Object proxy, Method method, Object[] args) @@ -182,9 +188,11 @@ protected Responder createResponder(Class iface, Object impl) { /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */ private class TunnelResponder implements TunnelProtocol { - private Responder responder; - public TunnelResponder(Class iface, Object impl) { - responder = createResponder(iface, impl); + private Map responders = + new HashMap(); + + public void addProtocol(Class iface, Object impl) { + responders.put(iface.getName(), createResponder(iface, impl)); } @Override @@ -197,13 +205,18 @@ public long getProtocolVersion(String protocol, long version) public ProtocolSignature getProtocolSignature( String protocol, long version, int clientMethodsHashCode) throws IOException { - return new ProtocolSignature(VERSION, null); + return ProtocolSignature.getProtocolSignature + (clientMethodsHashCode, VERSION, TunnelProtocol.class); } - public BufferListWritable call(final BufferListWritable request) + public BufferListWritable call(String protocol, BufferListWritable request) throws IOException { + Responder responder = responders.get(protocol); + if (responder == null) + throw new IOException("No responder for: "+protocol); return new BufferListWritable(responder.respond(request.buffers)); } + } public Object[] call(Method method, Object[][] params, @@ -212,6 +225,32 @@ public Object[] call(Method method, Object[][] params, throw new UnsupportedOperationException(); } + private class Server extends WritableRpcEngine.Server { + private TunnelResponder responder = new TunnelResponder(); + + public Server(Class iface, Object impl, String bindAddress, + int port, int numHandlers, int numReaders, + int queueSizePerHandler, boolean verbose, + Configuration conf, + SecretManager secretManager + ) throws IOException { + super((Class)null, new Object(), conf, + bindAddress, port, numHandlers, numReaders, + queueSizePerHandler, verbose, secretManager); + super.addProtocol(TunnelProtocol.class, responder); + responder.addProtocol(iface, impl); + } + + + @Override + public Server + addProtocol(Class protocolClass, IMPL protocolImpl) + throws IOException { + responder.addProtocol(protocolClass, protocolImpl); + return this; + } + } + /** Construct a server for a protocol implementation instance listening on a * port and address. */ public RPC.Server getServer(Class iface, Object impl, String bindAddress, @@ -220,10 +259,9 @@ public RPC.Server getServer(Class iface, Object impl, String bindAddress, Configuration conf, SecretManager secretManager ) throws IOException { - return ENGINE.getServer(TunnelProtocol.class, - new TunnelResponder(iface, impl), - bindAddress, port, numHandlers, numReaders, - queueSizePerHandler, verbose, conf, secretManager); + return new Server + (iface, impl, bindAddress, port, numHandlers, numReaders, + queueSizePerHandler, verbose, conf, secretManager); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index beb8cc1cfa1..5256e5e9356 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -605,7 +605,7 @@ protected Server(String bindAddress, int port, * @param protocolImpl - the impl of the protocol that will be called * @return the server (for convenience) */ - public + public Server addProtocol(Class protocolClass, IMPL protocolImpl ) throws IOException { throw new IOException("addProtocol Not Implemented"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 75b8d51f057..d1a80361850 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -555,7 +555,7 @@ public Server(Class protocolClass, Object protocolImpl, @Override - public Server + public Server addProtocol( Class protocolClass, IMPL protocolImpl) throws IOException { registerProtocolAndImpl(protocolClass, protocolImpl); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java index 7f7c510ab4f..e7b6657a5c5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java @@ -43,6 +43,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.fs.CommonConfigurationKeys; /** Unit tests for AvroRpc. */ public class TestAvroRpc extends TestCase { @@ -56,6 +57,9 @@ public class TestAvroRpc extends TestCase { public TestAvroRpc(String name) { super(name); } + public static interface EmptyProtocol {} + public static class EmptyImpl implements EmptyProtocol {} + public static class TestImpl implements AvroTestProtocol { public void ping() {} @@ -93,10 +97,12 @@ private void testReflect(boolean secure) throws Exception { sm = new TestTokenSecretManager(); } UserGroupInformation.setConfiguration(conf); + RPC.setProtocolEngine(conf, EmptyProtocol.class, AvroRpcEngine.class); RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class); - Server server = RPC.getServer(AvroTestProtocol.class, - new TestImpl(), ADDRESS, 0, 5, true, - conf, sm); + RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(), + ADDRESS, 0, 5, true, conf, sm); + server.addProtocol(AvroTestProtocol.class, new TestImpl()); + try { server.start(); InetSocketAddress addr = NetUtils.getConnectAddress(server);