HADOOP-7773. Add support for protocol buffer based RPC engine. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190611 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1bcff6860c
commit
55d3dc50d1
|
@ -89,6 +89,9 @@ Trunk (unreleased changes)
|
||||||
|
|
||||||
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
||||||
|
|
||||||
|
HADOOP-7773. Add support for protocol buffer based RPC engine.
|
||||||
|
(suresh)
|
||||||
|
|
||||||
Release 0.23.0 - Unreleased
|
Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -270,4 +270,8 @@
|
||||||
<!-- backward compatibility -->
|
<!-- backward compatibility -->
|
||||||
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
|
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<!-- protobuf generated code -->
|
||||||
|
<Class name="org.apache.hadoop.ipc.protobuf.HadoopRpcProtos"/>
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -0,0 +1,389 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Proxy;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RPC Engine for for protobuf based RPCs.
|
||||||
|
*/
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class ProtobufRpcEngine implements RpcEngine {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
|
||||||
|
|
||||||
|
private static final ClientCache CLIENTS = new ClientCache();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
|
||||||
|
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
|
||||||
|
SocketFactory factory, int rpcTimeout) throws IOException {
|
||||||
|
|
||||||
|
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
|
||||||
|
.getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
|
||||||
|
addr, ticket, conf, factory, rpcTimeout)), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Invoker implements InvocationHandler, Closeable {
|
||||||
|
private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
|
||||||
|
private boolean isClosed = false;
|
||||||
|
private Client.ConnectionId remoteId;
|
||||||
|
private Client client;
|
||||||
|
|
||||||
|
public Invoker(Class<?> protocol, InetSocketAddress addr,
|
||||||
|
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
|
||||||
|
int rpcTimeout) throws IOException {
|
||||||
|
this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol,
|
||||||
|
ticket, rpcTimeout, conf);
|
||||||
|
this.client = CLIENTS.getClient(conf, factory,
|
||||||
|
RpcResponseWritable.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private HadoopRpcRequestProto constructRpcRequest(Method method,
|
||||||
|
Object[] params) throws ServiceException {
|
||||||
|
HadoopRpcRequestProto rpcRequest;
|
||||||
|
HadoopRpcRequestProto.Builder builder = HadoopRpcRequestProto
|
||||||
|
.newBuilder();
|
||||||
|
builder.setMethodName(method.getName());
|
||||||
|
|
||||||
|
if (params.length != 2) { // RpcController + Message
|
||||||
|
throw new ServiceException("Too many parameters for request. Method: ["
|
||||||
|
+ method.getName() + "]" + ", Expected: 2, Actual: "
|
||||||
|
+ params.length);
|
||||||
|
}
|
||||||
|
if (params[1] == null) {
|
||||||
|
throw new ServiceException("null param while calling Method: ["
|
||||||
|
+ method.getName() + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
Message param = (Message) params[1];
|
||||||
|
builder.setRequest(param.toByteString());
|
||||||
|
rpcRequest = builder.build();
|
||||||
|
return rpcRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the client side invoker of RPC method. It only throws
|
||||||
|
* ServiceException, since the invocation proxy expects only
|
||||||
|
* ServiceException to be thrown by the method in case protobuf service.
|
||||||
|
*
|
||||||
|
* ServiceException has the following causes:
|
||||||
|
* <ol>
|
||||||
|
* <li>Exceptions encountered in this methods are thrown as
|
||||||
|
* RpcClientException, wrapped in RemoteException</li>
|
||||||
|
* <li>Remote exceptions are thrown wrapped in RemoteException</li>
|
||||||
|
* </ol>
|
||||||
|
*
|
||||||
|
* Note that the client calling protobuf RPC methods, must handle
|
||||||
|
* ServiceException by getting the cause from the ServiceException. If the
|
||||||
|
* cause is RemoteException, then unwrap it to get the exception thrown by
|
||||||
|
* the server.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Object invoke(Object proxy, Method method, Object[] args)
|
||||||
|
throws ServiceException {
|
||||||
|
long startTime = 0;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
startTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
|
||||||
|
RpcResponseWritable val = null;
|
||||||
|
try {
|
||||||
|
val = (RpcResponseWritable) client.call(
|
||||||
|
new RpcRequestWritable(rpcRequest), remoteId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
RpcClientException ce = new RpcClientException("Client exception", e);
|
||||||
|
throw new ServiceException(getRemoteException(ce));
|
||||||
|
}
|
||||||
|
|
||||||
|
HadoopRpcResponseProto response = val.message;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
long callTime = System.currentTimeMillis() - startTime;
|
||||||
|
LOG.debug("Call: " + method.getName() + " " + callTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrap the received message
|
||||||
|
ResponseStatus status = response.getStatus();
|
||||||
|
if (status != ResponseStatus.SUCCESS) {
|
||||||
|
RemoteException re = new RemoteException(response.getException()
|
||||||
|
.getExceptionName(), response.getException().getStackTrace());
|
||||||
|
re.fillInStackTrace();
|
||||||
|
throw new ServiceException(re);
|
||||||
|
}
|
||||||
|
|
||||||
|
Message prototype = null;
|
||||||
|
try {
|
||||||
|
prototype = getReturnProtoType(method);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
Message returnMessage;
|
||||||
|
try {
|
||||||
|
returnMessage = prototype.newBuilderForType()
|
||||||
|
.mergeFrom(response.getResponse()).build();
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
RpcClientException ce = new RpcClientException("Client exception", e);
|
||||||
|
throw new ServiceException(getRemoteException(ce));
|
||||||
|
}
|
||||||
|
return returnMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (!isClosed) {
|
||||||
|
isClosed = true;
|
||||||
|
CLIENTS.stopClient(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Message getReturnProtoType(Method method) throws Exception {
|
||||||
|
if (returnTypes.containsKey(method.getName())) {
|
||||||
|
return returnTypes.get(method.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
Class<?> returnType = method.getReturnType();
|
||||||
|
Method newInstMethod = returnType.getMethod("getDefaultInstance");
|
||||||
|
newInstMethod.setAccessible(true);
|
||||||
|
Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null);
|
||||||
|
returnTypes.put(method.getName(), prototype);
|
||||||
|
return prototype;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object[] call(Method method, Object[][] params,
|
||||||
|
InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writable Wrapper for Protocol Buffer Requests
|
||||||
|
*/
|
||||||
|
private static class RpcRequestWritable implements Writable {
|
||||||
|
HadoopRpcRequestProto message;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public RpcRequestWritable() {
|
||||||
|
}
|
||||||
|
|
||||||
|
RpcRequestWritable(HadoopRpcRequestProto message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
out.writeInt(message.toByteArray().length);
|
||||||
|
out.write(message.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
int length = in.readInt();
|
||||||
|
byte[] bytes = new byte[length];
|
||||||
|
in.readFully(bytes);
|
||||||
|
message = HadoopRpcRequestProto.parseFrom(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writable Wrapper for Protocol Buffer Responses
|
||||||
|
*/
|
||||||
|
private static class RpcResponseWritable implements Writable {
|
||||||
|
HadoopRpcResponseProto message;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public RpcResponseWritable() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public RpcResponseWritable(HadoopRpcResponseProto message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
out.writeInt(message.toByteArray().length);
|
||||||
|
out.write(message.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
int length = in.readInt();
|
||||||
|
byte[] bytes = new byte[length];
|
||||||
|
in.readFully(bytes);
|
||||||
|
message = HadoopRpcResponseProto.parseFrom(bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
static Client getClient(Configuration conf) {
|
||||||
|
return CLIENTS.getClient(conf, SocketFactory.getDefault(),
|
||||||
|
RpcResponseWritable.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RPC.Server getServer(Class<?> protocol, Object instance,
|
||||||
|
String bindAddress, int port, int numHandlers, int numReaders,
|
||||||
|
int queueSizePerHandler, boolean verbose, Configuration conf,
|
||||||
|
SecretManager<? extends TokenIdentifier> secretManager)
|
||||||
|
throws IOException {
|
||||||
|
return new Server(instance, conf, bindAddress, port, numHandlers,
|
||||||
|
numReaders, queueSizePerHandler, verbose, secretManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RemoteException getRemoteException(Exception e) {
|
||||||
|
return new RemoteException(e.getClass().getName(),
|
||||||
|
StringUtils.stringifyException(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Server extends RPC.Server {
|
||||||
|
private BlockingService service;
|
||||||
|
private boolean verbose;
|
||||||
|
|
||||||
|
private static String classNameBase(String className) {
|
||||||
|
String[] names = className.split("\\.", -1);
|
||||||
|
if (names == null || names.length == 0) {
|
||||||
|
return className;
|
||||||
|
}
|
||||||
|
return names[names.length - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct an RPC server.
|
||||||
|
*
|
||||||
|
* @param instance the instance whose methods will be called
|
||||||
|
* @param conf the configuration to use
|
||||||
|
* @param bindAddress the address to bind on to listen for connection
|
||||||
|
* @param port the port to listen for connections on
|
||||||
|
* @param numHandlers the number of method handler threads to run
|
||||||
|
* @param verbose whether each call should be logged
|
||||||
|
*/
|
||||||
|
public Server(Object instance, Configuration conf, String bindAddress,
|
||||||
|
int port, int numHandlers, int numReaders, int queueSizePerHandler,
|
||||||
|
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
|
||||||
|
throws IOException {
|
||||||
|
super(bindAddress, port, RpcRequestWritable.class, numHandlers,
|
||||||
|
numReaders, queueSizePerHandler, conf, classNameBase(instance
|
||||||
|
.getClass().getName()), secretManager);
|
||||||
|
this.service = (BlockingService) instance;
|
||||||
|
this.verbose = verbose;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a server side method, which is invoked over RPC. On success
|
||||||
|
* the return response has protobuf response payload. On failure, the
|
||||||
|
* exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
|
||||||
|
*
|
||||||
|
* In this method there three types of exceptions possible and they are
|
||||||
|
* returned in response as follows.
|
||||||
|
* <ol>
|
||||||
|
* <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
|
||||||
|
* <li> Exceptions thrown by the service is wrapped in ServiceException. In that
|
||||||
|
* this method returns in response the exception thrown by the service.</li>
|
||||||
|
* <li> Other exceptions thrown by the service. They are returned as
|
||||||
|
* it is.</li>
|
||||||
|
* </ol>
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Writable call(String protocol, Writable writableRequest,
|
||||||
|
long receiveTime) throws IOException {
|
||||||
|
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
|
||||||
|
HadoopRpcRequestProto rpcRequest = request.message;
|
||||||
|
String methodName = rpcRequest.getMethodName();
|
||||||
|
if (verbose)
|
||||||
|
LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
|
||||||
|
MethodDescriptor methodDescriptor = service.getDescriptorForType()
|
||||||
|
.findMethodByName(methodName);
|
||||||
|
if (methodDescriptor == null) {
|
||||||
|
String msg = "Unknown method " + methodName + " called on " + protocol
|
||||||
|
+ " protocol.";
|
||||||
|
LOG.warn(msg);
|
||||||
|
return handleException(new RpcServerException(msg));
|
||||||
|
}
|
||||||
|
Message prototype = service.getRequestPrototype(methodDescriptor);
|
||||||
|
Message param = prototype.newBuilderForType()
|
||||||
|
.mergeFrom(rpcRequest.getRequest()).build();
|
||||||
|
Message result;
|
||||||
|
try {
|
||||||
|
result = service.callBlockingMethod(methodDescriptor, null, param);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
Throwable cause = e.getCause();
|
||||||
|
return handleException(cause != null ? cause : e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return handleException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
|
||||||
|
return new RpcResponseWritable(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RpcResponseWritable handleException(Throwable e) {
|
||||||
|
HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
|
||||||
|
.setExceptionName(e.getClass().getName())
|
||||||
|
.setStackTrace(StringUtils.stringifyException(e)).build();
|
||||||
|
HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder()
|
||||||
|
.setStatus(ResponseStatus.ERRROR).setException(exception).build();
|
||||||
|
return new RpcResponseWritable(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
|
||||||
|
Message message) {
|
||||||
|
HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
|
||||||
|
.setResponse(message.toByteString())
|
||||||
|
.setStatus(ResponseStatus.SUCCESS)
|
||||||
|
.build();
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,10 +25,9 @@ public class RpcServerException extends RpcException {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs exception with the specified detail message.
|
* Constructs exception with the specified detail message.
|
||||||
*
|
* @param message detailed message.
|
||||||
* @param messages detailed message.
|
|
||||||
*/
|
*/
|
||||||
RpcServerException(final String message) {
|
public RpcServerException(final String message) {
|
||||||
super(message);
|
super(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,12 +35,11 @@ public class RpcServerException extends RpcException {
|
||||||
* Constructs exception with the specified detail message and cause.
|
* Constructs exception with the specified detail message and cause.
|
||||||
*
|
*
|
||||||
* @param message message.
|
* @param message message.
|
||||||
* @param cause that cause this exception
|
|
||||||
* @param cause the cause (can be retried by the {@link #getCause()} method).
|
* @param cause the cause (can be retried by the {@link #getCause()} method).
|
||||||
* (A <tt>null</tt> value is permitted, and indicates that the cause
|
* (A <tt>null</tt> value is permitted, and indicates that the cause
|
||||||
* is nonexistent or unknown.)
|
* is nonexistent or unknown.)
|
||||||
*/
|
*/
|
||||||
RpcServerException(final String message, final Throwable cause) {
|
public RpcServerException(final String message, final Throwable cause) {
|
||||||
super(message, cause);
|
super(message, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,73 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These are the messages used by Hadoop RPC to marshal the
|
||||||
|
* request and response in the RPC layer.
|
||||||
|
*/
|
||||||
|
option java_package = "org.apache.hadoop.ipc.protobuf";
|
||||||
|
option java_outer_classname = "HadoopRpcProtos";
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Message used to marshal the client request
|
||||||
|
* from RPC client to the RPC server.
|
||||||
|
*/
|
||||||
|
message HadoopRpcRequestProto {
|
||||||
|
/** Name of the RPC method */
|
||||||
|
required string methodName = 1;
|
||||||
|
|
||||||
|
/** Bytes corresponding to the client protobuf request */
|
||||||
|
optional bytes request = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* At the RPC layer, this message is used to indicate
|
||||||
|
* the server side exception the the RPC client.
|
||||||
|
*
|
||||||
|
* Hadoop RPC client throws an exception indicated
|
||||||
|
* by exceptionName with the stackTrace.
|
||||||
|
*/
|
||||||
|
message HadoopRpcExceptionProto {
|
||||||
|
/** Class name of the exception thrown from the server */
|
||||||
|
|
||||||
|
optional string exceptionName = 1;
|
||||||
|
/** Exception stack trace from the server side */
|
||||||
|
optional string stackTrace = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This message is used to marshal the response from
|
||||||
|
* RPC server to the client.
|
||||||
|
*/
|
||||||
|
message HadoopRpcResponseProto {
|
||||||
|
/** Status of IPC call */
|
||||||
|
enum ResponseStatus {
|
||||||
|
SUCCESS = 1;
|
||||||
|
ERRROR = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
required ResponseStatus status = 1;
|
||||||
|
|
||||||
|
// Protobuf response payload from the server, when status is SUCCESS.
|
||||||
|
optional bytes response = 2;
|
||||||
|
|
||||||
|
// Exception when status is ERROR or FATAL
|
||||||
|
optional HadoopRpcExceptionProto exception = 3;
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for testing protocol buffer based RPC mechanism.
|
||||||
|
* This test depends on test.proto definition of types in src/test/proto
|
||||||
|
* and protobuf service definition from src/test/test_rpc_service.proto
|
||||||
|
*/
|
||||||
|
public class TestProtoBufRpc {
|
||||||
|
public final static String ADDRESS = "0.0.0.0";
|
||||||
|
public final static int PORT = 0;
|
||||||
|
|
||||||
|
public static class ServerImpl implements BlockingInterface {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EmptyResponseProto ping(RpcController unused,
|
||||||
|
EmptyRequestProto request) throws ServiceException {
|
||||||
|
return EmptyResponseProto.newBuilder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EmptyResponseProto error(RpcController unused,
|
||||||
|
EmptyRequestProto request) throws ServiceException {
|
||||||
|
throw new ServiceException("error", new RpcServerException("error"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RPC.Server startRPCServer(Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
// Set RPC engine to protobuf RPC engine
|
||||||
|
RPC.setProtocolEngine(conf, BlockingService.class, ProtobufRpcEngine.class);
|
||||||
|
|
||||||
|
// Create server side implementation
|
||||||
|
ServerImpl serverImpl = new ServerImpl();
|
||||||
|
BlockingService service = TestProtobufRpcProto
|
||||||
|
.newReflectiveBlockingService(serverImpl);
|
||||||
|
|
||||||
|
// Get RPC server for serer side implementation
|
||||||
|
RPC.Server server = RPC.getServer(BlockingService.class, service, ADDRESS,
|
||||||
|
PORT, conf);
|
||||||
|
server.start();
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static BlockingInterface getClient(Configuration conf,
|
||||||
|
InetSocketAddress addr) throws IOException {
|
||||||
|
// Set RPC engine to protobuf RPC engine
|
||||||
|
RPC.setProtocolEngine(conf, BlockingInterface.class,
|
||||||
|
ProtobufRpcEngine.class);
|
||||||
|
BlockingInterface client = RPC.getProxy(BlockingInterface.class, 0, addr,
|
||||||
|
conf);
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProtoBufRpc() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
RPC.Server server = startRPCServer(conf);
|
||||||
|
BlockingInterface client = getClient(conf, server.getListenerAddress());
|
||||||
|
|
||||||
|
// Test ping method
|
||||||
|
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||||
|
client.ping(null, emptyRequest);
|
||||||
|
|
||||||
|
// Test echo method
|
||||||
|
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
||||||
|
.setMessage("hello").build();
|
||||||
|
EchoResponseProto echoResponse = client.echo(null, echoRequest);
|
||||||
|
Assert.assertEquals(echoResponse.getMessage(), "hello");
|
||||||
|
|
||||||
|
// Test error method - it should be thrown as RemoteException
|
||||||
|
try {
|
||||||
|
client.error(null, emptyRequest);
|
||||||
|
Assert.fail("Expected exception is not thrown");
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
RemoteException re = (RemoteException)e.getCause();
|
||||||
|
re.printStackTrace();
|
||||||
|
RpcServerException rse = (RpcServerException) re
|
||||||
|
.unwrapRemoteException(RpcServerException.class);
|
||||||
|
rse.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,395 @@
|
||||||
|
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||||
|
// source: test_rpc_service.proto
|
||||||
|
|
||||||
|
package org.apache.hadoop.ipc.protobuf;
|
||||||
|
|
||||||
|
public final class TestRpcServiceProtos {
|
||||||
|
private TestRpcServiceProtos() {}
|
||||||
|
public static void registerAllExtensions(
|
||||||
|
com.google.protobuf.ExtensionRegistry registry) {
|
||||||
|
}
|
||||||
|
public static abstract class TestProtobufRpcProto
|
||||||
|
implements com.google.protobuf.Service {
|
||||||
|
protected TestProtobufRpcProto() {}
|
||||||
|
|
||||||
|
public interface Interface {
|
||||||
|
public abstract void ping(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done);
|
||||||
|
|
||||||
|
public abstract void echo(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto> done);
|
||||||
|
|
||||||
|
public abstract void error(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static com.google.protobuf.Service newReflectiveService(
|
||||||
|
final Interface impl) {
|
||||||
|
return new TestProtobufRpcProto() {
|
||||||
|
@java.lang.Override
|
||||||
|
public void ping(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done) {
|
||||||
|
impl.ping(controller, request, done);
|
||||||
|
}
|
||||||
|
|
||||||
|
@java.lang.Override
|
||||||
|
public void echo(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto> done) {
|
||||||
|
impl.echo(controller, request, done);
|
||||||
|
}
|
||||||
|
|
||||||
|
@java.lang.Override
|
||||||
|
public void error(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done) {
|
||||||
|
impl.error(controller, request, done);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public static com.google.protobuf.BlockingService
|
||||||
|
newReflectiveBlockingService(final BlockingInterface impl) {
|
||||||
|
return new com.google.protobuf.BlockingService() {
|
||||||
|
public final com.google.protobuf.Descriptors.ServiceDescriptor
|
||||||
|
getDescriptorForType() {
|
||||||
|
return getDescriptor();
|
||||||
|
}
|
||||||
|
|
||||||
|
public final com.google.protobuf.Message callBlockingMethod(
|
||||||
|
com.google.protobuf.Descriptors.MethodDescriptor method,
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
com.google.protobuf.Message request)
|
||||||
|
throws com.google.protobuf.ServiceException {
|
||||||
|
if (method.getService() != getDescriptor()) {
|
||||||
|
throw new java.lang.IllegalArgumentException(
|
||||||
|
"Service.callBlockingMethod() given method descriptor for " +
|
||||||
|
"wrong service type.");
|
||||||
|
}
|
||||||
|
switch(method.getIndex()) {
|
||||||
|
case 0:
|
||||||
|
return impl.ping(controller, (org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto)request);
|
||||||
|
case 1:
|
||||||
|
return impl.echo(controller, (org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto)request);
|
||||||
|
case 2:
|
||||||
|
return impl.error(controller, (org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto)request);
|
||||||
|
default:
|
||||||
|
throw new java.lang.AssertionError("Can't get here.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public final com.google.protobuf.Message
|
||||||
|
getRequestPrototype(
|
||||||
|
com.google.protobuf.Descriptors.MethodDescriptor method) {
|
||||||
|
if (method.getService() != getDescriptor()) {
|
||||||
|
throw new java.lang.IllegalArgumentException(
|
||||||
|
"Service.getRequestPrototype() given method " +
|
||||||
|
"descriptor for wrong service type.");
|
||||||
|
}
|
||||||
|
switch(method.getIndex()) {
|
||||||
|
case 0:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto.getDefaultInstance();
|
||||||
|
case 1:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto.getDefaultInstance();
|
||||||
|
case 2:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto.getDefaultInstance();
|
||||||
|
default:
|
||||||
|
throw new java.lang.AssertionError("Can't get here.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public final com.google.protobuf.Message
|
||||||
|
getResponsePrototype(
|
||||||
|
com.google.protobuf.Descriptors.MethodDescriptor method) {
|
||||||
|
if (method.getService() != getDescriptor()) {
|
||||||
|
throw new java.lang.IllegalArgumentException(
|
||||||
|
"Service.getResponsePrototype() given method " +
|
||||||
|
"descriptor for wrong service type.");
|
||||||
|
}
|
||||||
|
switch(method.getIndex()) {
|
||||||
|
case 0:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance();
|
||||||
|
case 1:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto.getDefaultInstance();
|
||||||
|
case 2:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance();
|
||||||
|
default:
|
||||||
|
throw new java.lang.AssertionError("Can't get here.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void ping(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done);
|
||||||
|
|
||||||
|
public abstract void echo(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto> done);
|
||||||
|
|
||||||
|
public abstract void error(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done);
|
||||||
|
|
||||||
|
public static final
|
||||||
|
com.google.protobuf.Descriptors.ServiceDescriptor
|
||||||
|
getDescriptor() {
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.getDescriptor().getServices().get(0);
|
||||||
|
}
|
||||||
|
public final com.google.protobuf.Descriptors.ServiceDescriptor
|
||||||
|
getDescriptorForType() {
|
||||||
|
return getDescriptor();
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void callMethod(
|
||||||
|
com.google.protobuf.Descriptors.MethodDescriptor method,
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
com.google.protobuf.Message request,
|
||||||
|
com.google.protobuf.RpcCallback<
|
||||||
|
com.google.protobuf.Message> done) {
|
||||||
|
if (method.getService() != getDescriptor()) {
|
||||||
|
throw new java.lang.IllegalArgumentException(
|
||||||
|
"Service.callMethod() given method descriptor for wrong " +
|
||||||
|
"service type.");
|
||||||
|
}
|
||||||
|
switch(method.getIndex()) {
|
||||||
|
case 0:
|
||||||
|
this.ping(controller, (org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto)request,
|
||||||
|
com.google.protobuf.RpcUtil.<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto>specializeCallback(
|
||||||
|
done));
|
||||||
|
return;
|
||||||
|
case 1:
|
||||||
|
this.echo(controller, (org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto)request,
|
||||||
|
com.google.protobuf.RpcUtil.<org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto>specializeCallback(
|
||||||
|
done));
|
||||||
|
return;
|
||||||
|
case 2:
|
||||||
|
this.error(controller, (org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto)request,
|
||||||
|
com.google.protobuf.RpcUtil.<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto>specializeCallback(
|
||||||
|
done));
|
||||||
|
return;
|
||||||
|
default:
|
||||||
|
throw new java.lang.AssertionError("Can't get here.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public final com.google.protobuf.Message
|
||||||
|
getRequestPrototype(
|
||||||
|
com.google.protobuf.Descriptors.MethodDescriptor method) {
|
||||||
|
if (method.getService() != getDescriptor()) {
|
||||||
|
throw new java.lang.IllegalArgumentException(
|
||||||
|
"Service.getRequestPrototype() given method " +
|
||||||
|
"descriptor for wrong service type.");
|
||||||
|
}
|
||||||
|
switch(method.getIndex()) {
|
||||||
|
case 0:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto.getDefaultInstance();
|
||||||
|
case 1:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto.getDefaultInstance();
|
||||||
|
case 2:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto.getDefaultInstance();
|
||||||
|
default:
|
||||||
|
throw new java.lang.AssertionError("Can't get here.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public final com.google.protobuf.Message
|
||||||
|
getResponsePrototype(
|
||||||
|
com.google.protobuf.Descriptors.MethodDescriptor method) {
|
||||||
|
if (method.getService() != getDescriptor()) {
|
||||||
|
throw new java.lang.IllegalArgumentException(
|
||||||
|
"Service.getResponsePrototype() given method " +
|
||||||
|
"descriptor for wrong service type.");
|
||||||
|
}
|
||||||
|
switch(method.getIndex()) {
|
||||||
|
case 0:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance();
|
||||||
|
case 1:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto.getDefaultInstance();
|
||||||
|
case 2:
|
||||||
|
return org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance();
|
||||||
|
default:
|
||||||
|
throw new java.lang.AssertionError("Can't get here.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Stub newStub(
|
||||||
|
com.google.protobuf.RpcChannel channel) {
|
||||||
|
return new Stub(channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class Stub extends org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto implements Interface {
|
||||||
|
private Stub(com.google.protobuf.RpcChannel channel) {
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final com.google.protobuf.RpcChannel channel;
|
||||||
|
|
||||||
|
public com.google.protobuf.RpcChannel getChannel() {
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ping(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done) {
|
||||||
|
channel.callMethod(
|
||||||
|
getDescriptor().getMethods().get(0),
|
||||||
|
controller,
|
||||||
|
request,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance(),
|
||||||
|
com.google.protobuf.RpcUtil.generalizeCallback(
|
||||||
|
done,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.class,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void echo(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto> done) {
|
||||||
|
channel.callMethod(
|
||||||
|
getDescriptor().getMethods().get(1),
|
||||||
|
controller,
|
||||||
|
request,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto.getDefaultInstance(),
|
||||||
|
com.google.protobuf.RpcUtil.generalizeCallback(
|
||||||
|
done,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto.class,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto.getDefaultInstance()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void error(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request,
|
||||||
|
com.google.protobuf.RpcCallback<org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto> done) {
|
||||||
|
channel.callMethod(
|
||||||
|
getDescriptor().getMethods().get(2),
|
||||||
|
controller,
|
||||||
|
request,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance(),
|
||||||
|
com.google.protobuf.RpcUtil.generalizeCallback(
|
||||||
|
done,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.class,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockingInterface newBlockingStub(
|
||||||
|
com.google.protobuf.BlockingRpcChannel channel) {
|
||||||
|
return new BlockingStub(channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface BlockingInterface {
|
||||||
|
public org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto ping(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request)
|
||||||
|
throws com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
public org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto echo(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto request)
|
||||||
|
throws com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
public org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto error(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request)
|
||||||
|
throws com.google.protobuf.ServiceException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class BlockingStub implements BlockingInterface {
|
||||||
|
private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final com.google.protobuf.BlockingRpcChannel channel;
|
||||||
|
|
||||||
|
public org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto ping(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request)
|
||||||
|
throws com.google.protobuf.ServiceException {
|
||||||
|
return (org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto) channel.callBlockingMethod(
|
||||||
|
getDescriptor().getMethods().get(0),
|
||||||
|
controller,
|
||||||
|
request,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto echo(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto request)
|
||||||
|
throws com.google.protobuf.ServiceException {
|
||||||
|
return (org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto) channel.callBlockingMethod(
|
||||||
|
getDescriptor().getMethods().get(1),
|
||||||
|
controller,
|
||||||
|
request,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto.getDefaultInstance());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto error(
|
||||||
|
com.google.protobuf.RpcController controller,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto request)
|
||||||
|
throws com.google.protobuf.ServiceException {
|
||||||
|
return (org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto) channel.callBlockingMethod(
|
||||||
|
getDescriptor().getMethods().get(2),
|
||||||
|
controller,
|
||||||
|
request,
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto.getDefaultInstance());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static com.google.protobuf.Descriptors.FileDescriptor
|
||||||
|
getDescriptor() {
|
||||||
|
return descriptor;
|
||||||
|
}
|
||||||
|
private static com.google.protobuf.Descriptors.FileDescriptor
|
||||||
|
descriptor;
|
||||||
|
static {
|
||||||
|
java.lang.String[] descriptorData = {
|
||||||
|
"\n\026test_rpc_service.proto\032\ntest.proto2\250\001\n" +
|
||||||
|
"\024TestProtobufRpcProto\022/\n\004ping\022\022.EmptyReq" +
|
||||||
|
"uestProto\032\023.EmptyResponseProto\022-\n\004echo\022\021" +
|
||||||
|
".EchoRequestProto\032\022.EchoResponseProto\0220\n" +
|
||||||
|
"\005error\022\022.EmptyRequestProto\032\023.EmptyRespon" +
|
||||||
|
"seProtoB<\n\036org.apache.hadoop.ipc.protobu" +
|
||||||
|
"fB\024TestRpcServiceProtos\210\001\001\240\001\001"
|
||||||
|
};
|
||||||
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
public com.google.protobuf.ExtensionRegistry assignDescriptors(
|
||||||
|
com.google.protobuf.Descriptors.FileDescriptor root) {
|
||||||
|
descriptor = root;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
com.google.protobuf.Descriptors.FileDescriptor
|
||||||
|
.internalBuildGeneratedFileFrom(descriptorData,
|
||||||
|
new com.google.protobuf.Descriptors.FileDescriptor[] {
|
||||||
|
org.apache.hadoop.ipc.protobuf.TestProtos.getDescriptor(),
|
||||||
|
}, assigner);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @@protoc_insertion_point(outer_class_scope)
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
option java_package = "org.apache.hadoop.ipc.protobuf";
|
||||||
|
option java_outer_classname = "TestProtos";
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
|
||||||
|
message EmptyRequestProto {
|
||||||
|
}
|
||||||
|
|
||||||
|
message EmptyResponseProto {
|
||||||
|
}
|
||||||
|
|
||||||
|
message EchoRequestProto {
|
||||||
|
required string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message EchoResponseProto {
|
||||||
|
required string message = 1;
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
option java_package = "org.apache.hadoop.ipc.protobuf";
|
||||||
|
option java_outer_classname = "TestRpcServiceProtos";
|
||||||
|
option java_generic_services = true;
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
|
||||||
|
import "test.proto";
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A protobuf service for use in tests
|
||||||
|
*/
|
||||||
|
service TestProtobufRpcProto {
|
||||||
|
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
|
||||||
|
rpc echo(EchoRequestProto) returns (EchoResponseProto);
|
||||||
|
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
|
||||||
|
}
|
Loading…
Reference in New Issue