From ea50f154077e724cc1b2fe15565ede2f2dc2e6f4 Mon Sep 17 00:00:00 2001 From: Doug Cutting Date: Fri, 18 Sep 2009 17:47:08 +0000 Subject: [PATCH] HADOOP-6170. Add facility to tunnel Avro RPCs through Hadoop RPCs. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@816727 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + build.xml | 8 + ivy.xml | 10 +- ivy/libraries.properties | 6 + .../avro/AvroGenericSerialization.java | 2 +- .../avro/AvroReflectSerialization.java | 4 +- .../avro/AvroSpecificSerialization.java | 4 +- src/java/org/apache/hadoop/ipc/AvroRpc.java | 223 ++++++++++++++++++ .../apache/hadoop/ipc/AvroTestProtocol.java | 33 +++ .../org/apache/hadoop/ipc/TestAvroRpc.java | 94 ++++++++ 10 files changed, 380 insertions(+), 8 deletions(-) create mode 100644 src/java/org/apache/hadoop/ipc/AvroRpc.java create mode 100644 src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java create mode 100644 src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java diff --git a/CHANGES.txt b/CHANGES.txt index 8370346c1c9..35b6f19d011 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -196,6 +196,10 @@ Trunk (unreleased changes) HADOOP-4952. Add new improved file system interface FileContext for the application writer (Sanjay Radia via suresh) + HADOOP-6170. Add facility to tunnel Avro RPCs through Hadoop RPCs. + This permits one to take advantage of both Avro's RPC versioning + features and Hadoop's proven RPC scalability. (cutting) + IMPROVEMENTS HADOOP-4565. Added CombineFileInputFormat to use data locality information diff --git a/build.xml b/build.xml index 95c59023ecd..f17f66bab64 100644 --- a/build.xml +++ b/build.xml @@ -460,6 +460,14 @@ + + + + + diff --git a/ivy.xml b/ivy.xml index 5647e235974..b1bf5c6aadf 100644 --- a/ivy.xml +++ b/ivy.xml @@ -271,15 +271,19 @@ + diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 2d108bf761d..e2ce4f8f041 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -16,6 +16,8 @@ #These are the versions of our dependencies (in alphabetical order) apacheant.version=1.7.0 +avro.version=1.1.0 + checkstyle.version=4.2 commons-cli.version=1.2 @@ -42,6 +44,8 @@ hsqldb.version=1.8.0.10 #ivy.version=2.0.0-beta2 ivy.version=2.0.0-rc2 +jackson.version=1.0.1 + jasper.version=5.5.12 jsp.version=2.1 jsp-api.version=5.5.12 @@ -61,6 +65,8 @@ mina-core.version=2.0.0-M5 oro.version=2.0.8 +paranamer.version=1.5 + rats-lib.version=0.6 servlet.version=4.0.6 diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java index 2ea4cdd3435..52f25aec6a7 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java @@ -52,7 +52,7 @@ public class AvroGenericSerialization extends AvroSerialization { @Override protected Schema getSchema(Object t, Map metadata) { String jsonSchema = metadata.get(AVRO_SCHEMA_KEY); - return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.induce(t); + return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.get().induce(t); } @Override diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java index 649c72e8a76..4a8cf10fd71 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java @@ -85,7 +85,7 @@ public class AvroReflectSerialization extends AvroSerialization{ || "null".equals(clazz.getEnclosingClass().getName())) ? clazz.getPackage().getName() + "." : (clazz.getEnclosingClass().getName() + "$")); - return new ReflectDatumReader(ReflectData.getSchema(clazz), prefix); + return new ReflectDatumReader(ReflectData.get().getSchema(clazz), prefix); } catch (Exception e) { throw new RuntimeException(e); } @@ -93,7 +93,7 @@ public class AvroReflectSerialization extends AvroSerialization{ @Override protected Schema getSchema(Object t, Map metadata) { - return ReflectData.getSchema(t.getClass()); + return ReflectData.get().getSchema(t.getClass()); } @Override diff --git a/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java index a953e04d4ab..3b7cd989b32 100644 --- a/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java +++ b/src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java @@ -50,7 +50,7 @@ public class AvroSpecificSerialization try { Class clazz = (Class) getClassFromMetadata(metadata); - return new SpecificDatumReader(clazz.newInstance().schema()); + return new SpecificDatumReader(clazz.newInstance().getSchema()); } catch (Exception e) { throw new RuntimeException(e); } @@ -58,7 +58,7 @@ public class AvroSpecificSerialization @Override protected Schema getSchema(SpecificRecord t, Map metadata) { - return t.schema(); + return t.getSchema(); } @Override diff --git a/src/java/org/apache/hadoop/ipc/AvroRpc.java b/src/java/org/apache/hadoop/ipc/AvroRpc.java new file mode 100644 index 00000000000..4fbd9668641 --- /dev/null +++ b/src/java/org/apache/hadoop/ipc/AvroRpc.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.io.*; +import java.util.*; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import javax.net.SocketFactory; +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.net.NetUtils; + +import org.apache.avro.*; +import org.apache.avro.ipc.*; +import org.apache.avro.reflect.*; + +/** Tunnel Avro-format RPC requests over a Hadoop {@link RPC} connection. This + * does not give cross-language wire compatibility, since the Hadoop RPC wire + * format is non-standard, but it does permit use of Avro's protocol versioning + * features for inter-Java RPCs. */ +public class AvroRpc { + private static int VERSION = 0; + + /** Tunnel an Avro RPC request and response through Hadoop's RPC. */ + private static interface TunnelProtocol extends VersionedProtocol { + /** All Avro methods and responses go through this. */ + BufferListWritable call(BufferListWritable request) throws IOException; + } + + /** A Writable that holds a List, The Avro RPC Transceiver's + * basic unit of data transfer.*/ + private static class BufferListWritable implements Writable { + private List buffers; + + public BufferListWritable() {} // required for RPC Writables + + public BufferListWritable(List buffers) { + this.buffers = buffers; + } + + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + buffers = new ArrayList(size); + for (int i = 0; i < size; i++) { + int length = in.readInt(); + ByteBuffer buffer = ByteBuffer.allocate(length); + in.readFully(buffer.array(), 0, length); + buffers.add(buffer); + } + } + + public void write(DataOutput out) throws IOException { + out.writeInt(buffers.size()); + for (ByteBuffer buffer : buffers) { + out.writeInt(buffer.remaining()); + out.write(buffer.array(), buffer.position(), buffer.remaining()); + } + } + } + + /** An Avro RPC Transceiver that tunnels client requests through Hadoop + * RPC. */ + private static class ClientTransceiver extends Transceiver { + private TunnelProtocol tunnel; + private InetSocketAddress remote; + + public ClientTransceiver(InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, SocketFactory factory) + throws IOException { + this.tunnel = (TunnelProtocol)RPC.getProxy(TunnelProtocol.class, VERSION, + addr, ticket, conf, factory); + this.remote = addr; + } + + public String getRemoteName() { return remote.toString(); } + + public List transceive(List request) + throws IOException { + return tunnel.call(new BufferListWritable(request)).buffers; + } + + public List readBuffers() throws IOException { + throw new UnsupportedOperationException(); + } + + public void writeBuffers(List buffers) throws IOException { + throw new UnsupportedOperationException(); + } + + public void close() throws IOException {} + } + + private static class Invoker extends ReflectRequestor { + public Invoker(Protocol protocol, Transceiver transceiver) + throws IOException { + super(protocol, transceiver); + } + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. */ + public static Object getProxy(Class protocol, + InetSocketAddress addr, + Configuration conf) + throws IOException { + UserGroupInformation ugi = null; + try { + ugi = UserGroupInformation.login(conf); + } catch (LoginException le) { + throw new RuntimeException("Couldn't login!"); + } + return getProxy(protocol, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf)); + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. */ + public static Object getProxy + (final Class protocol, final InetSocketAddress addr, + final UserGroupInformation ticket, + final Configuration conf, final SocketFactory factory) + throws IOException { + + return Proxy.newProxyInstance + (protocol.getClassLoader(), new Class[] { protocol }, + new InvocationHandler() { + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return new Invoker + (ReflectData.get().getProtocol(protocol), + new ClientTransceiver(addr, ticket, conf, factory)) + .invoke(proxy, method, args); + } + }); + } + + /** An Avro RPC Transceiver that provides a request passed through Hadoop RPC + * to the Avro RPC Responder for processing. */ + private static class ServerTransceiver extends Transceiver { + List request; + + public ServerTransceiver(List request) { + this.request = request; + } + + public String getRemoteName() { return "remote"; } + + public List readBuffers() throws IOException { + return request; + } + + public void writeBuffers(List buffers) throws IOException { + throw new UnsupportedOperationException(); + } + + public void close() throws IOException {} + } + + /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */ + private static class TunnelResponder extends ReflectResponder + implements TunnelProtocol { + + public TunnelResponder(Class iface, Object impl) { + super(iface, impl); + } + + public long getProtocolVersion(String protocol, long version) + throws IOException { + return VERSION; + } + + public BufferListWritable call(final BufferListWritable request) + throws IOException { + return new BufferListWritable + (respond(new ServerTransceiver(request.buffers))); + } + } + + /** Construct a server for a protocol implementation instance listening on a + * port and address. */ + public static Server getServer(Object impl, String bindAddress, int port, + Configuration conf) + throws IOException { + return RPC.getServer(new TunnelResponder(impl.getClass(), impl), + bindAddress, port, conf); + + } + + /** Construct a server for a protocol implementation instance listening on a + * port and address. */ + public static RPC.Server getServer(Object impl, String bindAddress, int port, + int numHandlers, boolean verbose, + Configuration conf) + throws IOException { + return RPC.getServer(new TunnelResponder(impl.getClass(), impl), + bindAddress, port, numHandlers, verbose, conf); + } + +} diff --git a/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java b/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java new file mode 100644 index 00000000000..10f210eb47f --- /dev/null +++ b/src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.util.Utf8; + +@SuppressWarnings("serial") +public interface AvroTestProtocol { + public static class Problem extends AvroRemoteException { + public Problem() {} + } + void ping(); + Utf8 echo(Utf8 value); + int add(int v1, int v2); + int error() throws Problem; +} diff --git a/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java b/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java new file mode 100644 index 00000000000..703d8cb83c2 --- /dev/null +++ b/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import junit.framework.TestCase; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.net.NetUtils; + +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.util.Utf8; + +/** Unit tests for AvroRpc. */ +public class TestAvroRpc extends TestCase { + private static final String ADDRESS = "0.0.0.0"; + + public static final Log LOG = + LogFactory.getLog(TestAvroRpc.class); + + private static Configuration conf = new Configuration(); + + int datasize = 1024*100; + int numThreads = 50; + + public TestAvroRpc(String name) { super(name); } + + public static class TestImpl implements AvroTestProtocol { + + public void ping() {} + + public Utf8 echo(Utf8 value) { return value; } + + public int add(int v1, int v2) { return v1 + v2; } + + public int error() throws Problem { + throw new Problem(); + } + } + + public void testCalls() throws Exception { + Configuration conf = new Configuration(); + Server server = AvroRpc.getServer(new TestImpl(), ADDRESS, 0, conf); + AvroTestProtocol proxy = null; + try { + server.start(); + + InetSocketAddress addr = NetUtils.getConnectAddress(server); + proxy = + (AvroTestProtocol)AvroRpc.getProxy(AvroTestProtocol.class, addr, conf); + + proxy.ping(); + + Utf8 utf8Result = proxy.echo(new Utf8("hello world")); + assertEquals(new Utf8("hello world"), utf8Result); + + int intResult = proxy.add(1, 2); + assertEquals(3, intResult); + + boolean caught = false; + try { + proxy.error(); + } catch (AvroRemoteException e) { + LOG.debug("Caught " + e); + caught = true; + } + assertTrue(caught); + + } finally { + server.stop(); + } + } +}