diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 24d87d66b80..7a898741fac 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -3,6 +3,7 @@ Hadoop Change Log Trunk (unreleased changes) INCOMPATIBLE CHANGES + HADOOP-7920. Remove Avro Rpc. (suresh) NEW FEATURES HADOOP-7773. Add support for protocol buffer based RPC engine. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java deleted file mode 100644 index 8fec3d22b84..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc; - -import java.io.Closeable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.HashMap; - -import javax.net.SocketFactory; - -import org.apache.avro.ipc.Responder; -import org.apache.avro.ipc.Transceiver; -import org.apache.avro.ipc.reflect.ReflectRequestor; -import org.apache.avro.ipc.reflect.ReflectResponder; -import org.apache.avro.ipc.specific.SpecificRequestor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; - -/** Tunnel Avro-format RPC requests over a Hadoop {@link RPC} connection. This - * does not give cross-language wire compatibility, since the Hadoop RPC wire - * format is non-standard, but it does permit use of Avro's protocol versioning - * features for inter-Java RPCs. */ -@InterfaceStability.Evolving -public class AvroRpcEngine implements RpcEngine { - private static final Log LOG = LogFactory.getLog(RPC.class); - - private static int VERSION = 1; - - // the implementation we tunnel through - private static final RpcEngine ENGINE = new WritableRpcEngine(); - - /** Tunnel an Avro RPC request and response through Hadoop's RPC. */ - private static interface TunnelProtocol extends VersionedProtocol { - //WritableRpcEngine expects a versionID in every protocol. - public static final long versionID = VERSION; - /** All Avro methods and responses go through this. */ - BufferListWritable call(String protocol, BufferListWritable request) - throws IOException; - } - - /** A Writable that holds a List, The Avro RPC Transceiver's - * basic unit of data transfer.*/ - private static class BufferListWritable implements Writable { - private List buffers; - - public BufferListWritable() {} // required for RPC Writables - - public BufferListWritable(List buffers) { - this.buffers = buffers; - } - - public void readFields(DataInput in) throws IOException { - int size = in.readInt(); - buffers = new ArrayList(size); - for (int i = 0; i < size; i++) { - int length = in.readInt(); - ByteBuffer buffer = ByteBuffer.allocate(length); - in.readFully(buffer.array(), 0, length); - buffers.add(buffer); - } - } - - public void write(DataOutput out) throws IOException { - out.writeInt(buffers.size()); - for (ByteBuffer buffer : buffers) { - out.writeInt(buffer.remaining()); - out.write(buffer.array(), buffer.position(), buffer.remaining()); - } - } - } - - /** An Avro RPC Transceiver that tunnels client requests through Hadoop - * RPC. */ - private static class ClientTransceiver extends Transceiver { - private TunnelProtocol tunnel; - private InetSocketAddress remote; - private String protocol; - - public ClientTransceiver(InetSocketAddress addr, - UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout, String protocol) - throws IOException { - this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION, - addr, ticket, conf, factory, - rpcTimeout).getProxy(); - this.remote = addr; - this.protocol = protocol; - } - - public String getRemoteName() { return remote.toString(); } - - public List transceive(List request) - throws IOException { - return tunnel.call(protocol, new BufferListWritable(request)).buffers; - } - - public List readBuffers() throws IOException { - throw new UnsupportedOperationException(); - } - - public void writeBuffers(List buffers) throws IOException { - throw new UnsupportedOperationException(); - } - - public void close() throws IOException { - RPC.stopProxy(tunnel); - } - } - - /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * @param */ - @SuppressWarnings("unchecked") - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout) - throws IOException { - return new ProtocolProxy(protocol, - (T)Proxy.newProxyInstance( - protocol.getClassLoader(), - new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)), - false); - } - - private class Invoker implements InvocationHandler, Closeable { - private final ClientTransceiver tx; - private final SpecificRequestor requestor; - public Invoker(Class protocol, InetSocketAddress addr, - UserGroupInformation ticket, Configuration conf, - SocketFactory factory, - int rpcTimeout) throws IOException { - this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout, - protocol.getName()); - this.requestor = createRequestor(protocol, tx); - } - @Override public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - return requestor.invoke(proxy, method, args); - } - public void close() throws IOException { - tx.close(); - } - } - - protected SpecificRequestor createRequestor(Class protocol, - Transceiver transeiver) throws IOException { - return new ReflectRequestor(protocol, transeiver); - } - - protected Responder createResponder(Class iface, Object impl) { - return new ReflectResponder(iface, impl); - } - - /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */ - private class TunnelResponder implements TunnelProtocol { - private Map responders = - new HashMap(); - - public void addProtocol(Class iface, Object impl) { - responders.put(iface.getName(), createResponder(iface, impl)); - } - - @Override - public long getProtocolVersion(String protocol, long version) - throws IOException { - return VERSION; - } - - @Override - public ProtocolSignature getProtocolSignature( - String protocol, long version, int clientMethodsHashCode) - throws IOException { - return ProtocolSignature.getProtocolSignature - (clientMethodsHashCode, VERSION, TunnelProtocol.class); - } - - public BufferListWritable call(String protocol, BufferListWritable request) - throws IOException { - Responder responder = responders.get(protocol); - if (responder == null) - throw new IOException("No responder for: "+protocol); - return new BufferListWritable(responder.respond(request.buffers)); - } - - } - - public Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, UserGroupInformation ticket, - Configuration conf) throws IOException { - throw new UnsupportedOperationException(); - } - - private class Server extends WritableRpcEngine.Server { - private TunnelResponder responder = new TunnelResponder(); - - public Server(Class iface, Object impl, String bindAddress, - int port, int numHandlers, int numReaders, - int queueSizePerHandler, boolean verbose, - Configuration conf, - SecretManager secretManager - ) throws IOException { - super((Class)null, new Object(), conf, - bindAddress, port, numHandlers, numReaders, - queueSizePerHandler, verbose, secretManager); - // RpcKind is WRITABLE since Avro is tunneled through WRITABLE - super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder); - responder.addProtocol(iface, impl); - } - - - @Override - public Server - addProtocol(RpcKind rpcKind, Class protocolClass, Object protocolImpl) - throws IOException { - responder.addProtocol(protocolClass, protocolImpl); - return this; - } - } - - /** Construct a server for a protocol implementation instance listening on a - * port and address. */ - public RPC.Server getServer(Class iface, Object impl, String bindAddress, - int port, int numHandlers, int numReaders, - int queueSizePerHandler, boolean verbose, - Configuration conf, - SecretManager secretManager - ) throws IOException { - return new Server - (iface, impl, bindAddress, port, numHandlers, numReaders, - queueSizePerHandler, verbose, conf, secretManager); - } - -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java deleted file mode 100644 index 995a13a9c73..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc; - -import java.io.IOException; - -import org.apache.avro.ipc.Responder; -import org.apache.avro.ipc.Transceiver; -import org.apache.avro.ipc.specific.SpecificRequestor; -import org.apache.avro.ipc.specific.SpecificResponder; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * AvroRpcEngine which uses Avro's "specific" APIs. The protocols generated - * via Avro IDL needs to use this Engine. - */ -@InterfaceStability.Evolving -public class AvroSpecificRpcEngine extends AvroRpcEngine { - - protected SpecificRequestor createRequestor(Class protocol, - Transceiver transeiver) throws IOException { - return new SpecificRequestor(protocol, transeiver); - } - - protected Responder createResponder(Class iface, Object impl) { - return new SpecificResponder(iface, impl); - } - -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java index 430e0a9dea2..6e97159fb46 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java @@ -56,9 +56,8 @@ public class RpcPayloadHeader implements Writable { public enum RpcKind { RPC_BUILTIN ((short) 1), // Used for built in calls by tests RPC_WRITABLE ((short) 2), // Use WritableRpcEngine - RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine - RPC_AVRO ((short) 4); // Use AvroRpcEngine - static final short MAX_INDEX = RPC_AVRO.value; // used for array size + RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine + final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size private static final short FIRST_INDEX = RPC_BUILTIN.value; private final short value; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/AvroTestProtocol.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/AvroTestProtocol.java deleted file mode 100644 index d5d73962e34..00000000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/AvroTestProtocol.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc; - -import org.apache.avro.AvroRemoteException; - -@SuppressWarnings("serial") -public interface AvroTestProtocol { - public static class Problem extends AvroRemoteException { - public Problem() {} - } - void ping(); - String echo(String value); - int add(int v1, int v2); - int error() throws Problem; -} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java deleted file mode 100644 index 5ce3359428a..00000000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import javax.security.sasl.Sasl; - -import junit.framework.Assert; -import junit.framework.TestCase; - -import org.apache.avro.AvroRemoteException; -import org.apache.avro.util.Utf8; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; -import org.apache.hadoop.ipc.TestSaslRPC.CustomSecurityInfo; -import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier; -import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityInfo; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.fs.CommonConfigurationKeys; - -/** Unit tests for AvroRpc. */ -public class TestAvroRpc extends TestCase { - private static final String ADDRESS = "0.0.0.0"; - - public static final Log LOG = - LogFactory.getLog(TestAvroRpc.class); - - int datasize = 1024*100; - int numThreads = 50; - - public TestAvroRpc(String name) { super(name); } - - public static interface EmptyProtocol {} - public static class EmptyImpl implements EmptyProtocol {} - - public static class TestImpl implements AvroTestProtocol { - - public void ping() {} - - public String echo(String value) { return value; } - - public int add(int v1, int v2) { return v1 + v2; } - - public int error() throws Problem { - throw new Problem(); - } - } - - public void testReflect() throws Exception { - testReflect(false); - } - - public void testSecureReflect() throws Exception { - testReflect(true); - } - - public void testSpecific() throws Exception { - testSpecific(false); - } - - public void testSecureSpecific() throws Exception { - testSpecific(true); - } - - private void testReflect(boolean secure) throws Exception { - Configuration conf = new Configuration(); - TestTokenSecretManager sm = null; - if (secure) { - makeSecure(conf); - sm = new TestTokenSecretManager(); - } - UserGroupInformation.setConfiguration(conf); - RPC.setProtocolEngine(conf, EmptyProtocol.class, AvroRpcEngine.class); - RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class); - RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(), - ADDRESS, 0, 5, true, conf, sm); - server.addProtocol(RpcKind.RPC_WRITABLE, - AvroTestProtocol.class, new TestImpl()); - - try { - server.start(); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - - if (secure) { - addToken(sm, addr); - //QOP must be auth - Assert.assertEquals("auth", SaslRpcServer.SASL_PROPS.get(Sasl.QOP)); - } - - AvroTestProtocol proxy = - (AvroTestProtocol)RPC.getProxy(AvroTestProtocol.class, 0, addr, conf); - - proxy.ping(); - - String echo = proxy.echo("hello world"); - assertEquals("hello world", echo); - - int intResult = proxy.add(1, 2); - assertEquals(3, intResult); - - boolean caught = false; - try { - proxy.error(); - } catch (AvroRemoteException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Caught " + e); - } - caught = true; - } - assertTrue(caught); - - } finally { - resetSecurity(); - server.stop(); - } - } - - private void makeSecure(Configuration conf) { - conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - conf.set("hadoop.rpc.socket.factory.class.default", ""); - //Avro doesn't work with security annotations on protocol. - //Avro works ONLY with custom security context - SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo()); - } - - private void resetSecurity() { - SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]); - } - - private void addToken(TestTokenSecretManager sm, - InetSocketAddress addr) throws IOException { - final UserGroupInformation current = UserGroupInformation.getCurrentUser(); - - TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current - .getUserName())); - Token token = new Token(tokenId, - sm); - Text host = new Text(addr.getAddress().getHostAddress() + ":" - + addr.getPort()); - token.setService(host); - LOG.info("Service IP address for token is " + host); - current.addToken(token); - } - - private void testSpecific(boolean secure) throws Exception { - Configuration conf = new Configuration(); - TestTokenSecretManager sm = null; - if (secure) { - makeSecure(conf); - sm = new TestTokenSecretManager(); - } - UserGroupInformation.setConfiguration(conf); - RPC.setProtocolEngine(conf, AvroSpecificTestProtocol.class, - AvroSpecificRpcEngine.class); - Server server = RPC.getServer(AvroSpecificTestProtocol.class, - new AvroSpecificTestProtocolImpl(), ADDRESS, 0, 5, true, - conf, sm); - try { - server.start(); - InetSocketAddress addr = NetUtils.getConnectAddress(server); - - if (secure) { - addToken(sm, addr); - //QOP must be auth - Assert.assertEquals("auth", SaslRpcServer.SASL_PROPS.get(Sasl.QOP)); - } - - AvroSpecificTestProtocol proxy = - (AvroSpecificTestProtocol)RPC.getProxy(AvroSpecificTestProtocol.class, - 0, addr, conf); - - CharSequence echo = proxy.echo("hello world"); - assertEquals("hello world", echo.toString()); - - int intResult = proxy.add(1, 2); - assertEquals(3, intResult); - - } finally { - resetSecurity(); - server.stop(); - } - } - - public static class AvroSpecificTestProtocolImpl implements - AvroSpecificTestProtocol { - - @Override - public int add(int arg1, int arg2) throws AvroRemoteException { - return arg1 + arg2; - } - - @Override - public CharSequence echo(CharSequence msg) throws AvroRemoteException { - return msg; - } - - } - -}