HADOOP-6930. AvroRpcEngine doesn't work with generated Avro code. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@993529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sharad Agarwal 2010-09-07 20:49:23 +00:00
parent becf8e919a
commit 086223892e
9 changed files with 200 additions and 35 deletions

View File

@ -235,6 +235,9 @@ Trunk (unreleased changes)
HADOOP-6938. ConnectionId.getRemotePrincipal() should check if security
is enabled. (Kan Zhang via hairong)
HADOOP-6930. AvroRpcEngine doesn't work with generated Avro code.
(sharad)
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -473,6 +473,17 @@
</schema>
</target>
<target name="generate-avro-protocols" depends="init, ivy-retrieve-test">
<taskdef name="schema" classname="org.apache.avro.specific.ProtocolTask">
<classpath refid="test.classpath"/>
</taskdef>
<schema destdir="${test.generated.dir}">
<fileset dir="${test.src.dir}">
<include name="**/*.avpr" />
</fileset>
</schema>
</target>
<!-- ================================================================== -->
<!-- Compile test code -->
<!-- ================================================================== -->
@ -480,7 +491,7 @@
<target name="-classes-compilation"
depends="compile-core-classes, compile-core-test"/>
<target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records, generate-avro-records">
<target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records, generate-avro-records, generate-avro-protocols">
<mkdir dir="${test.core.build.classes}"/>
<javac
encoding="${build.encoding}"

View File

@ -18,34 +18,40 @@
package org.apache.hadoop.ipc;
import java.io.*;
import java.util.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.net.SocketFactory;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.*;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.reflect.ReflectRequestor;
import org.apache.avro.reflect.ReflectResponder;
import org.apache.avro.specific.SpecificRequestor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.net.NetUtils;
import org.apache.avro.*;
import org.apache.avro.ipc.*;
import org.apache.avro.reflect.*;
/** Tunnel Avro-format RPC requests over a Hadoop {@link RPC} connection. This
* does not give cross-language wire compatibility, since the Hadoop RPC wire
* format is non-standard, but it does permit use of Avro's protocol versioning
* features for inter-Java RPCs. */
class AvroRpcEngine implements RpcEngine {
@InterfaceStability.Evolving
public class AvroRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
private static int VERSION = 0;
@ -150,15 +156,15 @@ class AvroRpcEngine implements RpcEngine {
}
}
private static class Invoker implements InvocationHandler, Closeable {
private class Invoker implements InvocationHandler, Closeable {
private final ClientTransceiver tx;
private final ReflectRequestor requestor;
private final SpecificRequestor requestor;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
this.requestor = new ReflectRequestor(protocol, tx);
this.requestor = createRequestor(protocol, tx);
}
@Override public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
@ -169,12 +175,20 @@ class AvroRpcEngine implements RpcEngine {
}
}
/** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
private static class TunnelResponder extends ReflectResponder
implements TunnelProtocol {
protected SpecificRequestor createRequestor(Class<?> protocol,
Transceiver transeiver) throws IOException {
return new ReflectRequestor(protocol, transeiver);
}
protected Responder createResponder(Class<?> iface, Object impl) {
return new ReflectResponder(iface, impl);
}
/** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
private class TunnelResponder implements TunnelProtocol {
private Responder responder;
public TunnelResponder(Class<?> iface, Object impl) {
super(iface, impl);
responder = createResponder(iface, impl);
}
public long getProtocolVersion(String protocol, long version)
@ -184,7 +198,7 @@ class AvroRpcEngine implements RpcEngine {
public BufferListWritable call(final BufferListWritable request)
throws IOException {
return new BufferListWritable(respond(request.buffers));
return new BufferListWritable(responder.respond(request.buffers));
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.specific.SpecificRequestor;
import org.apache.avro.specific.SpecificResponder;
import org.apache.hadoop.classification.InterfaceStability;
/**
* AvroRpcEngine which uses Avro's "specific" APIs. The protocols generated
* via Avro IDL needs to use this Engine.
*/
@InterfaceStability.Evolving
public class AvroSpecificRpcEngine extends AvroRpcEngine {
protected SpecificRequestor createRequestor(Class<?> protocol,
Transceiver transeiver) throws IOException {
return new SpecificRequestor(protocol, transeiver);
}
protected Responder createResponder(Class<?> iface, Object impl) {
return new SpecificResponder(iface, impl);
}
}

View File

@ -78,8 +78,13 @@ public class RPC {
private static final String ENGINE_PROP = "rpc.engine";
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
/**
* Set a protocol to use a non-default RpcEngine.
* @param conf configuration to use
* @param protocol the protocol interface
* @param engine the RpcEngine impl
*/
public static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
}

View File

@ -18,18 +18,21 @@
package org.apache.hadoop.ipc;
import java.lang.reflect.Method;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.Configuration;
/** An RPC implementation. */
interface RpcEngine {
@InterfaceStability.Evolving
public interface RpcEngine {
/** Construct a client-side proxy object. */
Object getProxy(Class<?> protocol,

View File

@ -44,7 +44,8 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/** An RpcEngine implementation for Writable data. */
class WritableRpcEngine implements RpcEngine {
@InterfaceStability.Evolving
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
/** A method invocation, including the method name and its parameters.*/

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"protocol" : "AvroSpecificTestProtocol",
"namespace" : "org.apache.hadoop.ipc",
"messages" : {
"echo" : {
"request" : [ {
"name" : "message",
"type" : "string"
} ],
"response" : "string"
},
"add" : {
"request" : [ {
"name" : "arg1",
"type" : "int"
}, {
"name" : "arg2",
"type" : "int",
"default" : 0
} ],
"response" : "int"
}
}
}

View File

@ -18,18 +18,16 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import junit.framework.TestCase;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
/** Unit tests for AvroRpc. */
public class TestAvroRpc extends TestCase {
@ -94,4 +92,47 @@ public class TestAvroRpc extends TestCase {
server.stop();
}
}
public void testAvroSpecificRpc() throws Exception {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, AvroSpecificTestProtocol.class,
AvroSpecificRpcEngine.class);
Server server = RPC.getServer(AvroSpecificTestProtocol.class,
new AvroSpecificTestProtocolImpl(),
ADDRESS, 0, conf);
AvroSpecificTestProtocol proxy = null;
try {
server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server);
proxy =
(AvroSpecificTestProtocol)RPC.getProxy(AvroSpecificTestProtocol.class,
0, addr, conf);
Utf8 echo = proxy.echo(new Utf8("hello world"));
assertEquals("hello world", echo.toString());
int intResult = proxy.add(1, 2);
assertEquals(3, intResult);
} finally {
server.stop();
}
}
public static class AvroSpecificTestProtocolImpl implements
AvroSpecificTestProtocol {
@Override
public int add(int arg1, int arg2) throws AvroRemoteException {
return arg1 + arg2;
}
@Override
public Utf8 echo(Utf8 msg) throws AvroRemoteException {
return msg;
}
}
}