HBASE-5705 Introduce Protocol Buffer RPC engine

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1367009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-07-30 07:07:17 +00:00
parent f86b1ecb03
commit cee7c32732
17 changed files with 4413 additions and 1051 deletions

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.HashMap;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.io.Writable;
/**
* Cache a client using its socket factory as the hash key.
* Enables reuse/sharing of clients on a per SocketFactory basis. A client
* establishes certain configuration dependent characteristics like timeouts,
* tcp-keepalive (true or false), etc. For more details on the characteristics,
* look at {@link HBaseClient#HBaseClient(Class, Configuration, SocketFactory)}
* Creation of dynamic proxies to protocols creates the clients (and increments
* reference count once created), and stopping of the proxies leads to clearing
* out references and when the reference drops to zero, the cache mapping is
* cleared.
*/
class ClientCache {
private Map<SocketFactory, HBaseClient> clients =
new HashMap<SocketFactory, HBaseClient>();
protected ClientCache() {}
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @param factory socket factory
* @return an IPC client
*/
protected synchronized HBaseClient getClient(Configuration conf,
SocketFactory factory) {
return getClient(conf, factory, HbaseObjectWritable.class);
}
protected synchronized HBaseClient getClient(Configuration conf,
SocketFactory factory, Class<? extends Writable> valueClass) {
HBaseClient client = clients.get(factory);
if (client == null) {
// Make an hbase client instead of hadoop Client.
client = new HBaseClient(valueClass, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
/**
* Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
* @param client client to stop
*/
protected void stopClient(HBaseClient client) {
synchronized (this) {
client.decCount();
if (client.isZeroReference()) {
clients.remove(client.getSocketFactory());
}
}
if (client.isZeroReference()) {
client.stop();
}
}
}

View File

@ -53,12 +53,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
@ -82,7 +82,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.ByteString;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a /** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on * parameter, and return a {@link Writable} as their value. A service runs on
@ -826,17 +825,15 @@ public class HBaseClient {
try { try {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id); LOG.debug(getName() + " sending #" + call.id);
RpcRequest.Builder builder = RPCProtos.RpcRequest.newBuilder(); RpcRequestHeader.Builder builder = RPCProtos.RpcRequestHeader.newBuilder();
builder.setCallId(call.id); builder.setCallId(call.id);
Invocation invocation = (Invocation)call.param;
DataOutputBuffer d = new DataOutputBuffer(); DataOutputBuffer d = new DataOutputBuffer();
invocation.write(d); builder.build().writeDelimitedTo(d);
builder.setRequest(ByteString.copyFrom(d.getData())); call.param.write(d);
//noinspection SynchronizeOnNonFinalField //noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
RpcRequest obj = builder.build(); this.out.writeInt(d.getLength());
this.out.writeInt(obj.getSerializedSize()); this.out.write(d.getData(), 0, d.getLength());
obj.writeTo(DataOutputOutputStream.constructOutputStream(this.out));
this.out.flush(); this.out.flush();
} }
} catch(IOException e) { } catch(IOException e) {
@ -859,7 +856,7 @@ public class HBaseClient {
// so the exception name/trace), and the response bytes // so the exception name/trace), and the response bytes
// Read the call id. // Read the call id.
RpcResponse response = RpcResponse.parseDelimitedFrom(in); RpcResponseHeader response = RpcResponseHeader.parseDelimitedFrom(in);
if (response == null) { if (response == null) {
// When the stream is closed, protobuf doesn't raise an EOFException, // When the stream is closed, protobuf doesn't raise an EOFException,
// instead, it returns a null message object. // instead, it returns a null message object.
@ -873,11 +870,8 @@ public class HBaseClient {
Status status = response.getStatus(); Status status = response.getStatus();
if (status == Status.SUCCESS) { if (status == Status.SUCCESS) {
ByteString responseObj = response.getResponse();
DataInputStream dis =
new DataInputStream(responseObj.newInput());
Writable value = ReflectionUtils.newInstance(valueClass, conf); Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(dis); // read value value.readFields(in); // read value
// it's possible that this call may have been cleaned up due to a RPC // it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value. // timeout, so check if it still exists before setting the value.
if (call != null) { if (call != null) {
@ -885,18 +879,20 @@ public class HBaseClient {
} }
calls.remove(id); calls.remove(id);
} else if (status == Status.ERROR) { } else if (status == Status.ERROR) {
RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
if (call != null) { if (call != null) {
//noinspection ThrowableInstanceNeverThrown //noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException( call.setException(new RemoteException(
response.getException().getExceptionName(), exceptionResponse.getExceptionName(),
response.getException().getStackTrace())); exceptionResponse.getStackTrace()));
calls.remove(id); calls.remove(id);
} }
} else if (status == Status.FATAL) { } else if (status == Status.FATAL) {
RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
// Close the connection // Close the connection
markClosed(new RemoteException( markClosed(new RemoteException(
response.getException().getExceptionName(), exceptionResponse.getExceptionName(),
response.getException().getStackTrace())); exceptionResponse.getStackTrace()));
} }
} catch (IOException e) { } catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -101,6 +102,13 @@ public class HBaseRPC {
} }
}; };
static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
throws NoSuchFieldException, IllegalAccessException {
Field versionField = protocol.getField("VERSION");
versionField.setAccessible(true);
return versionField.getLong(protocol);
}
// set a protocol to use a non-default RpcEngine // set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf, static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) { Class protocol, Class engine) {
@ -333,17 +341,22 @@ public class HBaseRPC {
long clientVersion, InetSocketAddress addr, User ticket, long clientVersion, InetSocketAddress addr, User ticket,
Configuration conf, SocketFactory factory, int rpcTimeout) Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException { throws IOException {
VersionedProtocol proxy = RpcEngine engine = getProtocolEngine(protocol,conf);
getProtocolEngine(protocol,conf) VersionedProtocol proxy = engine
.getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout())); .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
if (engine instanceof WritableRpcEngine) {
long serverVersion = proxy.getProtocolVersion(protocol.getName(), long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion); clientVersion);
if (serverVersion == clientVersion) { if (serverVersion == clientVersion) {
return proxy; return proxy;
} }
throw new VersionMismatch(protocol.getName(), clientVersion, throw new VersionMismatch(protocol.getName(), clientVersion,
serverVersion); serverVersion);
} }
return proxy;
}
/** /**
* Construct a client-side proxy object with the default SocketFactory * Construct a client-side proxy object with the default SocketFactory

View File

@ -73,11 +73,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -381,23 +381,21 @@ public abstract class HBaseServer implements RpcServer {
} }
ByteBufferOutputStream buf = new ByteBufferOutputStream(size); ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try { try {
RpcResponse.Builder builder = RpcResponse.newBuilder(); RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
// Call id. // Call id.
builder.setCallId(this.id); builder.setCallId(this.id);
builder.setStatus(status); builder.setStatus(status);
builder.build().writeDelimitedTo(out);
if (error != null) { if (error != null) {
RpcException.Builder b = RpcException.newBuilder(); RpcException.Builder b = RpcException.newBuilder();
b.setExceptionName(errorClass); b.setExceptionName(errorClass);
b.setStackTrace(error); b.setStackTrace(error);
builder.setException(b.build()); b.build().writeDelimitedTo(out);
} else { } else {
DataOutputBuffer d = new DataOutputBuffer(size); result.write(out);
result.write(d);
byte[] response = d.getData();
builder.setResponse(ByteString.copyFrom(response));
} }
builder.build().writeDelimitedTo(buf);
if (connection.useWrap) { if (connection.useWrap) {
wrapWithSasl(buf); wrapWithSasl(buf);
} }
@ -1616,9 +1614,10 @@ public abstract class HBaseServer implements RpcServer {
} }
protected void processData(byte[] buf) throws IOException, InterruptedException { protected void processData(byte[] buf) throws IOException, InterruptedException {
RpcRequest request = RpcRequest.parseFrom(buf); DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
RpcRequestHeader request = RpcRequestHeader.parseDelimitedFrom(dis);
int id = request.getCallId(); int id = request.getCallId();
ByteString clientRequest = request.getRequest();
long callSize = buf.length; long callSize = buf.length;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -1639,8 +1638,6 @@ public abstract class HBaseServer implements RpcServer {
Writable param; Writable param;
try { try {
DataInputStream dis =
new DataInputStream(clientRequest.newInput());
param = ReflectionUtils.newInstance(paramClass, conf);//read param param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis); param.readFields(dis);
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -57,7 +57,7 @@ public class Invocation extends VersionedWritable implements Configurable {
// For generated protocol classes which don't have VERSION field, // For generated protocol classes which don't have VERSION field,
// such as protobuf interfaces. // such as protobuf interfaces.
private static final Map<Class<?>, Long> static final Map<Class<?>, Long>
PROTOCOL_VERSION = new HashMap<Class<?>, Long>(); PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
static { static {

View File

@ -0,0 +1,501 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.io.*;
import org.apache.hadoop.hbase.util.Objects;
import org.apache.hadoop.hbase.util.ProtoUtil;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
/**
* The {@link RpcEngine} implementation for ProtoBuf-based RPCs.
*/
@InterfaceAudience.Private
class ProtobufRpcEngine implements RpcEngine {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcEngine");
protected final static ClientCache CLIENTS = new ClientCache();
@Override
public VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol, long clientVersion,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout);
return (VersionedProtocol)Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);
}
@Override
public void stopProxy(VersionedProtocol proxy) {
if (proxy!=null) {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
}
@Override
public Server getServer(Class<? extends VersionedProtocol> protocol,
Object instance, Class<?>[] ifaces, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
Configuration conf, int highPriorityLevel) throws IOException {
return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
metaHandlerCount, verbose, highPriorityLevel);
}
private static class Invoker implements InvocationHandler {
private final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private Class<? extends VersionedProtocol> protocol;
private InetSocketAddress address;
private User ticket;
private HBaseClient client;
private boolean isClosed = false;
final private int rpcTimeout;
private final long clientProtocolVersion;
public Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
this.protocol = protocol;
this.address = addr;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
this.rpcTimeout = rpcTimeout;
Long version = Invocation.PROTOCOL_VERSION.get(protocol);
if (version != null) {
this.clientProtocolVersion = version;
} else {
try {
this.clientProtocolVersion = HBaseRPC.getProtocolVersion(protocol);
} catch (NoSuchFieldException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
}
}
}
private RpcRequestBody constructRpcRequest(Method method,
Object[] params) throws ServiceException {
RpcRequestBody rpcRequest;
RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
builder.setMethodName(method.getName());
Message param;
int length = params.length;
if (length == 2) {
// RpcController + Message in the method args
// (generated code from RPC bits in .proto files have RpcController)
param = (Message)params[1];
} else if (length == 1) { // Message
param = (Message)params[0];
} else {
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
builder.setRequest(param.toByteString());
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
}
/**
* This is the client side invoker of RPC method. It only throws
* ServiceException, since the invocation proxy expects only
* ServiceException to be thrown by the method in case protobuf service.
*
* ServiceException has the following causes:
* <ol>
* <li>Exceptions encountered on the client side in this method are
* set as cause in ServiceException as is.</li>
* <li>Exceptions from the server are wrapped in RemoteException and are
* set as cause in ServiceException</li>
* </ol>
*
* Note that the client calling protobuf RPC methods, must handle
* ServiceException by getting the cause from the ServiceException. If the
* cause is RemoteException, then unwrap it to get the exception thrown by
* the server.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
}
RpcRequestBody rpcRequest = constructRpcRequest(method, args);
RpcResponseWritable val = null;
try {
val = (RpcResponseWritable) client.call(
new RpcRequestWritable(rpcRequest), address, protocol, ticket,
rpcTimeout);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
Message protoType = null;
protoType = getReturnProtoType(method);
Message returnMessage;
returnMessage = protoType.newBuilderForType()
.mergeFrom(val.responseMessage).build();
return returnMessage;
} catch (Throwable e) {
throw new ServiceException(e);
}
}
synchronized protected void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
private Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
Class<?> returnType = method.getReturnType();
Method newInstMethod = returnType.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
returnTypes.put(method.getName(), protoType);
return protoType;
}
}
/**
* Writable Wrapper for Protocol Buffer Requests
*/
private static class RpcRequestWritable implements Writable {
RpcRequestBody message;
@SuppressWarnings("unused")
public RpcRequestWritable() {
}
RpcRequestWritable(RpcRequestBody message) {
this.message = message;
}
@Override
public void write(DataOutput out) throws IOException {
((Message)message).writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
}
@Override
public void readFields(DataInput in) throws IOException {
int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = RpcRequestBody.parseFrom(bytes);
}
public int getSerializedSize() {
return message.getSerializedSize();
}
@Override
public String toString() {
return " Client Protocol Version: " +
message.getClientProtocolVersion() + " MethodName: " +
message.getMethodName();
}
}
/**
* Writable Wrapper for Protocol Buffer Responses
*/
private static class RpcResponseWritable implements Writable {
byte[] responseMessage;
@SuppressWarnings("unused")
public RpcResponseWritable() {
}
public RpcResponseWritable(Message message) {
this.responseMessage = message.toByteArray();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(responseMessage.length);
out.write(responseMessage);
}
@Override
public void readFields(DataInput in) throws IOException {
int length = in.readInt();
byte[] bytes = new byte[length];
in.readFully(bytes);
responseMessage = bytes;
}
}
public static class Server extends WritableRpcEngine.Server {
boolean verbose;
Object instance;
Class<?> implementation;
private static final String WARN_RESPONSE_TIME =
"hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE =
"hbase.ipc.warn.response.size";
/** Default value for above params */
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
/** Names for suffixed metrics */
private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
private final int warnResponseTime;
private final int warnResponseSize;
public Server(Object instance, final Class<?>[] ifaces,
Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel)
throws IOException {
super(instance, ifaces, RpcRequestWritable.class, conf, bindAddress, port,
numHandlers, metaHandlerCount, verbose, highPriorityLevel);
this.verbose = verbose;
this.instance = instance;
this.implementation = instance.getClass();
// create metrics for the advertised interfaces this server implements.
String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
DEFAULT_WARN_RESPONSE_SIZE);
}
private final Map<String, Message> methodArg =
new ConcurrentHashMap<String, Message>();
private final Map<String, Method> methodInstances =
new ConcurrentHashMap<String, Method>();
@Override
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
public Writable call(Class<? extends VersionedProtocol> protocol,
Writable writableRequest, long receiveTime, MonitoredRPCHandler status)
throws IOException {
try {
RpcRequestWritable request = (RpcRequestWritable) writableRequest;
RpcRequestBody rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();
Method method = getMethod(protocol, methodName);
if (method == null) {
throw new HBaseRPC.UnknownProtocolException("Method " + methodName +
" doesn't exist in protocol " + protocol.getName());
}
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*/
//TODO: use the clientVersion to do protocol compatibility checks, and
//this could be used here to handle complex use cases like deciding
//which implementation of the protocol should be used to service the
//current request, etc. Ideally, we shouldn't land up in a situation
//where we need to support such a use case.
//For now the clientVersion field is simply ignored
long clientVersion = rpcRequest.getClientProtocolVersion();
if (verbose) {
LOG.info("Call: protocol name=" + protocol.getName() +
", method=" + methodName);
}
status.setRPC(rpcRequest.getMethodName(),
new Object[]{rpcRequest.getRequest()}, receiveTime);
status.setRPCPacket(writableRequest);
status.resume("Servicing call");
//get an instance of the method arg type
Message protoType = getMethodArgType(method);
Message param = protoType.newBuilderForType()
.mergeFrom(rpcRequest.getRequest()).build();
Message result;
Object impl = null;
if (protocol.isAssignableFrom(this.implementation)) {
impl = this.instance;
} else {
throw new HBaseRPC.UnknownProtocolException(protocol);
}
long startTime = System.currentTimeMillis();
if (method.getParameterTypes().length == 2) {
// RpcController + Message in the method args
// (generated code from RPC bits in .proto files have RpcController)
result = (Message)method.invoke(impl, null, param);
} else if (method.getParameterTypes().length == 1) {
// Message (hand written code usually has only a single argument)
result = (Message)method.invoke(impl, param);
} else {
throw new ServiceException("Too many parameters for method: ["
+ method.getName() + "]" + ", allowed (at most): 2, Actual: "
+ method.getParameterTypes().length);
}
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receiveTime);
if (TRACELOG.isDebugEnabled()) {
TRACELOG.debug("Call #" + CurCall.get().id +
"; Served: " + protocol.getSimpleName()+"#"+method.getName() +
" queueTime=" + qTime +
" processingTime=" + processingTime +
" contents=" + Objects.describeQuantity(param));
}
rpcMetrics.rpcQueueTime.inc(qTime);
rpcMetrics.rpcProcessingTime.inc(processingTime);
rpcMetrics.inc(method.getName(), processingTime);
if (verbose) {
WritableRpcEngine.log("Return: "+result, LOG);
}
long responseSize = result.getSerializedSize();
// log any RPC responses that are slower than the configured warn
// response time or larger than configured warning size
boolean tooSlow = (processingTime > warnResponseTime
&& warnResponseTime > -1);
boolean tooLarge = (responseSize > warnResponseSize
&& warnResponseSize > -1);
if (tooSlow || tooLarge) {
// when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow.
StringBuilder buffer = new StringBuilder(256);
buffer.append(methodName);
buffer.append("(");
buffer.append(param.getClass().getName());
buffer.append(")");
buffer.append(", client version="+clientVersion);
logResponse(new Object[]{rpcRequest.getRequest()},
methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
status.getClient(), startTime, processingTime, qTime,
responseSize);
// provides a count of log-reported slow responses
if (tooSlow) {
rpcMetrics.rpcSlowResponseTime.inc(processingTime);
}
}
if (processingTime > 1000) {
// we use a hard-coded one second period so that we can clearly
// indicate the time period we're warning about in the name of the
// metric itself
rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC,
processingTime);
}
return new RpcResponseWritable(result);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
throw (IOException)target;
}
if (target instanceof ServiceException) {
throw ProtobufUtil.getRemoteException((ServiceException)target);
}
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
} catch (Throwable e) {
if (!(e instanceof IOException)) {
LOG.error("Unexpected throwable object ", e);
}
IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
}
private Method getMethod(Class<? extends VersionedProtocol> protocol,
String methodName) {
Method method = methodInstances.get(methodName);
if (method != null) {
return method;
}
Method[] methods = protocol.getMethods();
LOG.warn("Methods length : " + methods.length);
for (Method m : methods) {
if (m.getName().equals(methodName)) {
m.setAccessible(true);
methodInstances.put(methodName, m);
return m;
}
}
return null;
}
private Message getMethodArgType(Method method) throws Exception {
Message protoType = methodArg.get(method.getName());
if (protoType != null) {
return protoType;
}
Class<?>[] args = method.getParameterTypes();
Class<?> arg;
if (args.length == 2) {
// RpcController + Message in the method args
// (generated code from RPC bits in .proto files have RpcController)
arg = args[1];
} else if (args.length == 1) {
arg = args[0];
} else {
//unexpected
return null;
}
//in the protobuf methods, args[1] is the only significant argument
Method newInstMethod = arg.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
methodArg.put(method.getName(), protoType);
return protoType;
}
}
}

View File

@ -28,18 +28,14 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.io.*; import java.io.*;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.commons.logging.*; import org.apache.commons.logging.*;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@ -69,57 +65,6 @@ class WritableRpcEngine implements RpcEngine {
// DEBUG log level does NOT emit RPC-level logging. // DEBUG log level does NOT emit RPC-level logging.
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine"); private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
/* Cache a client using its socket factory as the hash key */
static private class ClientCache {
private Map<SocketFactory, HBaseClient> clients =
new HashMap<SocketFactory, HBaseClient>();
protected ClientCache() {}
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @param factory socket factory
* @return an IPC client
*/
protected synchronized HBaseClient getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for
// all configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
HBaseClient client = clients.get(factory);
if (client == null) {
// Make an hbase client instead of hadoop Client.
client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
/**
* Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
* @param client client to stop
*/
protected void stopClient(HBaseClient client) {
synchronized (this) {
client.decCount();
if (client.isZeroReference()) {
clients.remove(client.getSocketFactory());
}
}
if (client.isZeroReference()) {
client.stop();
}
}
}
protected final static ClientCache CLIENTS = new ClientCache(); protected final static ClientCache CLIENTS = new ClientCache();
private static class Invoker implements InvocationHandler { private static class Invoker implements InvocationHandler {
@ -150,8 +95,8 @@ class WritableRpcEngine implements RpcEngine {
try { try {
HbaseObjectWritable value = (HbaseObjectWritable) HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, client.call(new Invocation(method, args), address, protocol, ticket,
protocol, ticket, rpcTimeout); rpcTimeout);
if (logDebug) { if (logDebug) {
// FIGURE HOW TO TURN THIS OFF! // FIGURE HOW TO TURN THIS OFF!
long callTime = System.currentTimeMillis() - startTime; long callTime = System.currentTimeMillis() - startTime;
@ -271,18 +216,23 @@ class WritableRpcEngine implements RpcEngine {
/** Construct an RPC server. /** Construct an RPC server.
* @param instance the instance whose methods will be called * @param instance the instance whose methods will be called
* @param ifaces the interfaces the server supports
* @param paramClass an instance of this class is used to read the RPC requests
* @param conf the configuration to use * @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection * @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on * @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run * @param numHandlers the number of method handler threads to run
* @param metaHandlerCount the number of meta handlers desired
* @param verbose whether each call should be logged * @param verbose whether each call should be logged
* @param highPriorityLevel the priority level this server treats as high priority RPCs
* @throws IOException e * @throws IOException e
*/ */
public Server(Object instance, final Class<?>[] ifaces, public Server(Object instance, final Class<?>[] ifaces,
Class<? extends Writable> paramClass,
Configuration conf, String bindAddress, int port, Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose, int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel) throws IOException { int highPriorityLevel) throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, super(bindAddress, port, paramClass, numHandlers, metaHandlerCount,
conf, classNameBase(instance.getClass().getName()), conf, classNameBase(instance.getClass().getName()),
highPriorityLevel); highPriorityLevel);
this.instance = instance; this.instance = instance;
@ -301,6 +251,14 @@ class WritableRpcEngine implements RpcEngine {
DEFAULT_WARN_RESPONSE_SIZE); DEFAULT_WARN_RESPONSE_SIZE);
} }
public Server(Object instance, final Class<?>[] ifaces,
Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel) throws IOException {
this(instance, ifaces, Invocation.class, conf, bindAddress, port,
numHandlers, metaHandlerCount, verbose, highPriorityLevel);
}
public AuthenticationTokenSecretManager createSecretManager(){ public AuthenticationTokenSecretManager createSecretManager(){
if (!User.isSecurityEnabled() || if (!User.isSecurityEnabled() ||
!(instance instanceof org.apache.hadoop.hbase.Server)) { !(instance instanceof org.apache.hadoop.hbase.Server)) {
@ -341,7 +299,7 @@ class WritableRpcEngine implements RpcEngine {
throw new IOException("Could not find requested method, the usual " + throw new IOException("Could not find requested method, the usual " +
"cause is a version mismatch between client and server."); "cause is a version mismatch between client and server.");
} }
if (verbose) log("Call: " + call); if (verbose) log("Call: " + call, LOG);
status.setRPC(call.getMethodName(), call.getParameters(), receivedTime); status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
status.setRPCPacket(param); status.setRPCPacket(param);
status.resume("Servicing call"); status.resume("Servicing call");
@ -389,7 +347,7 @@ class WritableRpcEngine implements RpcEngine {
rpcMetrics.rpcQueueTime.inc(qTime); rpcMetrics.rpcQueueTime.inc(qTime);
rpcMetrics.rpcProcessingTime.inc(processingTime); rpcMetrics.rpcProcessingTime.inc(processingTime);
rpcMetrics.inc(call.getMethodName(), processingTime); rpcMetrics.inc(call.getMethodName(), processingTime);
if (verbose) log("Return: "+value); if (verbose) log("Return: "+value, LOG);
HbaseObjectWritable retVal = HbaseObjectWritable retVal =
new HbaseObjectWritable(method.getReturnType(), value); new HbaseObjectWritable(method.getReturnType(), value);
@ -403,7 +361,8 @@ class WritableRpcEngine implements RpcEngine {
if (tooSlow || tooLarge) { if (tooSlow || tooLarge) {
// when tagging, we let TooLarge trump TooSmall to keep output simple // when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow. // note that large responses will often also be slow.
logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"), logResponse(call.getParameters(), call.getMethodName(),
call.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
status.getClient(), startTime, processingTime, qTime, status.getClient(), startTime, processingTime, qTime,
responseSize); responseSize);
// provides a count of log-reported slow responses // provides a count of log-reported slow responses
@ -444,7 +403,9 @@ class WritableRpcEngine implements RpcEngine {
/** /**
* Logs an RPC response to the LOG file, producing valid JSON objects for * Logs an RPC response to the LOG file, producing valid JSON objects for
* client Operations. * client Operations.
* @param call The call to log. * @param params The parameters received in the call.
* @param methodName The name of the method invoked
* @param call The string representation of the call
* @param tag The tag that will be used to indicate this event in the log. * @param tag The tag that will be used to indicate this event in the log.
* @param client The address of the client who made this call. * @param client The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms. * @param startTime The time that the call was initiated, in ms.
@ -453,10 +414,10 @@ class WritableRpcEngine implements RpcEngine {
* prior to being initiated, in ms. * prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer. * @param responseSize The size in bytes of the response buffer.
*/ */
private void logResponse(Invocation call, String tag, String clientAddress, void logResponse(Object[] params, String methodName, String call, String tag,
long startTime, int processingTime, int qTime, long responseSize) String clientAddress, long startTime, int processingTime, int qTime,
long responseSize)
throws IOException { throws IOException {
Object params[] = call.getParameters();
// for JSON encoding // for JSON encoding
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
// base information that is reported regardless of type of call // base information that is reported regardless of type of call
@ -467,7 +428,7 @@ class WritableRpcEngine implements RpcEngine {
responseInfo.put("responsesize", responseSize); responseInfo.put("responsesize", responseSize);
responseInfo.put("client", clientAddress); responseInfo.put("client", clientAddress);
responseInfo.put("class", instance.getClass().getSimpleName()); responseInfo.put("class", instance.getClass().getSimpleName());
responseInfo.put("method", call.getMethodName()); responseInfo.put("method", methodName);
if (params.length == 2 && instance instanceof HRegionServer && if (params.length == 2 && instance instanceof HRegionServer &&
params[0] instanceof byte[] && params[0] instanceof byte[] &&
params[1] instanceof Operation) { params[1] instanceof Operation) {
@ -491,14 +452,14 @@ class WritableRpcEngine implements RpcEngine {
} else { } else {
// can't get JSON details, so just report call.toString() along with // can't get JSON details, so just report call.toString() along with
// a more generic tag. // a more generic tag.
responseInfo.put("call", call.toString()); responseInfo.put("call", call);
LOG.warn("(response" + tag + "): " + LOG.warn("(response" + tag + "): " +
mapper.writeValueAsString(responseInfo)); mapper.writeValueAsString(responseInfo));
} }
} }
} }
protected static void log(String value) { protected static void log(String value, Log LOG) {
String v = value; String v = value;
if (v != null && v.length() > 55) if (v != null && v.length() > 55)
v = v.substring(0, 55)+"..."; v = v.substring(0, 55)+"...";

View File

@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@ -99,8 +101,6 @@ import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -331,7 +331,7 @@ Server {
} }
int numHandlers = conf.getInt("hbase.master.handler.count", int numHandlers = conf.getInt("hbase.master.handler.count",
conf.getInt("hbase.regionserver.handler.count", 25)); conf.getInt("hbase.regionserver.handler.count", 25));
this.rpcServer = HBaseRPC.getServer(this, this.rpcServer = HBaseRPC.getServer(MasterMonitorProtocol.class, this,
new Class<?>[]{MasterMonitorProtocol.class, new Class<?>[]{MasterMonitorProtocol.class,
MasterAdminProtocol.class, RegionServerStatusProtocol.class}, MasterAdminProtocol.class, RegionServerStatusProtocol.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server. initialIsa.getHostName(), // BindAddress is IP we got for this server.

View File

@ -261,5 +261,4 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
} }
return super.toString() + ", rpcMethod=" + getRPC(); return super.toString() + ", rpcMethod=" + getRPC();
} }
} }

View File

@ -499,7 +499,7 @@ public class HRegionServer implements ClientProtocol,
throw new IllegalArgumentException("Failed resolve of " + initialIsa); throw new IllegalArgumentException("Failed resolve of " + initialIsa);
} }
this.rpcServer = HBaseRPC.getServer(this, this.rpcServer = HBaseRPC.getServer(AdminProtocol.class, this,
new Class<?>[]{ClientProtocol.class, new Class<?>[]{ClientProtocol.class,
AdminProtocol.class, HBaseRPCErrorHandler.class, AdminProtocol.class, HBaseRPCErrorHandler.class,
OnlineRegions.class}, OnlineRegions.class},

View File

@ -26,15 +26,20 @@
* *
* As part of setting up a connection to a server, the client needs to send * As part of setting up a connection to a server, the client needs to send
* the ConnectionHeader header. At the data level, this looks like * the ConnectionHeader header. At the data level, this looks like
* <"hrpc"-bytearray><5-byte><length-of-serialized-ConnectionHeader-obj[int32]><ConnectionHeader-object serialized> * <"hrpc"-bytearray><'5'-byte><length-of-serialized-ConnectionHeader-obj[int32]><ConnectionHeader-object serialized>
* *
* For every RPC that the client makes it needs to send the * For every RPC that the client makes it needs to send the following
* RpcRequest. At the data level this looks like * RpcRequestHeader and the RpcRequestBody. At the data level this looks like:
* <length-of-serialized-RpcRequest-obj><RpcRequest-object serialized> * <length-of-serialized-RpcRequestHeader + length-of-serialized-RpcRequestBody>
* <RpcRequestHeader [serialized using Message.writeDelimitedTo]>
* <RpcRequestBody [serialized using Message.writeTo]>
* *
* The server sends back a RpcResponse object as response. * On a success, the server's protobuf response looks like
* At the data level this looks like * <RpcResponseHeader-object [serialized using Message.writeDelimitedTo]>
* <protobuf-encoded-length-of-serialized-RpcResponse-obj><RpcResponse-object serialized> * <RpcResponseBody-object [serialized using Message.writeTo]>
* On a failure, the server's protobuf response looks like
* <RpcResponseHeader-object [serialized using Message.writeDelimitedTo]>
* <RpcException-object [serialized using Message.writeTo]>
* *
* There is one special message that's sent from client to server - * There is one special message that's sent from client to server -
* the Ping message. At the data level, this is just the bytes corresponding * the Ping message. At the data level, this is just the bytes corresponding
@ -63,15 +68,47 @@ message ConnectionHeader {
/** /**
* The complete RPC request message * The RPC request header
*/ */
message RpcRequest { message RpcRequestHeader {
/** Monotonically increasing callId, mostly to keep track of RPCs */ /** Monotonically increasing callId, mostly to keep track of RPCs */
required int32 callId = 1; required uint32 callId = 1;
/** The request bytes */ }
optional bytes request = 2; /**
* The RPC request body
*/
message RpcRequestBody {
/** Name of the RPC method */
required string methodName = 1;
/** protocol version of class declaring the called method */
optional uint64 clientProtocolVersion = 2;
/** Bytes corresponding to the client protobuf request */
optional bytes request = 3;
} }
/**
* The RPC response header
*/
message RpcResponseHeader {
/** Echo back the callId the client sent */
required int32 callId = 1;
/** Did the RPC execution encounter an error at the server */
enum Status {
SUCCESS = 0;
ERROR = 1;
FATAL = 2;
}
required Status status = 2;
}
/**
* The RPC response body
*/
message RpcResponseBody {
/** Optional response bytes */
optional bytes response = 1;
}
/** /**
* At the RPC layer, this message is used to indicate * At the RPC layer, this message is used to indicate
* the server side exception to the RPC client. * the server side exception to the RPC client.
@ -86,22 +123,3 @@ message RpcException {
/** Exception stack trace from the server side */ /** Exception stack trace from the server side */
optional string stackTrace = 2; optional string stackTrace = 2;
} }
/**
* The complete RPC response message
*/
message RpcResponse {
/** Echo back the callId the client sent */
required int32 callId = 1;
/** Did the RPC execution encounter an error at the server */
enum Status {
SUCCESS = 0;
ERROR = 1;
FATAL = 2;
}
required Status status = 2;
/** Optional response bytes */
optional bytes response = 3;
/** Optional exception when error is true*/
optional RpcException exception = 4;
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.experimental.categories.Category;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Test for testing protocol buffer based RPC mechanism.
* This test depends on test.proto definition of types in
* hbase-server/src/test/protobuf/test.proto
* and protobuf service definition from
* hbase-server/src/test/protobuf/test_rpc_service.proto
*/
@Category(MediumTests.class)
public class TestProtoBufRpc {
public final static String ADDRESS = "0.0.0.0";
public final static int PORT = 0;
private static InetSocketAddress addr;
private static Configuration conf;
private static RpcServer server;
public interface TestRpcService
extends TestProtobufRpcProto.BlockingInterface, VersionedProtocol {
public long VERSION = 1;
}
public static class PBServerImpl implements TestRpcService {
@Override
public EmptyResponseProto ping(RpcController unused,
EmptyRequestProto request) throws ServiceException {
return EmptyResponseProto.newBuilder().build();
}
@Override
public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
throws ServiceException {
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
.build();
}
@Override
public EmptyResponseProto error(RpcController unused,
EmptyRequestProto request) throws ServiceException {
throw new ServiceException("error", new IOException("error"));
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
// TODO Auto-generated method stub
return null;
}
}
@Before
public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration();
// Set RPC engine to protobuf RPC engine
HBaseRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
// Create server side implementation
PBServerImpl serverImpl = new PBServerImpl();
// Get RPC server for server side implementation
server = HBaseRPC.getServer(TestRpcService.class,serverImpl,
new Class[]{TestRpcService.class},
ADDRESS, PORT, 10, 10, true, conf, 0);
addr = server.getListenerAddress();
server.start();
}
@After
public void tearDown() throws Exception {
server.stop();
}
private static TestRpcService getClient() throws IOException {
// Set RPC engine to protobuf RPC engine
HBaseRPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
return (TestRpcService) HBaseRPC.getProxy(TestRpcService.class, 0,
addr, conf, 10000);
}
@Test
public void testProtoBufRpc() throws Exception {
TestRpcService client = getClient();
testProtoBufRpc(client);
}
// separated test out so that other tests can call it.
public static void testProtoBufRpc(TestRpcService client) throws Exception {
// Test ping method
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
client.ping(null, emptyRequest);
// Test echo method
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
.setMessage("hello").build();
EchoResponseProto echoResponse = client.echo(null, echoRequest);
Assert.assertEquals(echoResponse.getMessage(), "hello");
// Test error method - error should be thrown as RemoteException
try {
client.error(null, emptyRequest);
Assert.fail("Expected exception is not thrown");
} catch (ServiceException e) {
}
}
}

View File

@ -0,0 +1,396 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: test_rpc_service.proto
package org.apache.hadoop.hbase.ipc.protobuf.generated;
public final class TestRpcServiceProtos {
private TestRpcServiceProtos() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public static abstract class TestProtobufRpcProto
implements com.google.protobuf.Service {
protected TestProtobufRpcProto() {}
public interface Interface {
public abstract void ping(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
public abstract void echo(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto> done);
public abstract void error(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
}
public static com.google.protobuf.Service newReflectiveService(
final Interface impl) {
return new TestProtobufRpcProto() {
@java.lang.Override
public void ping(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done) {
impl.ping(controller, request, done);
}
@java.lang.Override
public void echo(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto> done) {
impl.echo(controller, request, done);
}
@java.lang.Override
public void error(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done) {
impl.error(controller, request, done);
}
};
}
public static com.google.protobuf.BlockingService
newReflectiveBlockingService(final BlockingInterface impl) {
return new com.google.protobuf.BlockingService() {
public final com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptorForType() {
return getDescriptor();
}
public final com.google.protobuf.Message callBlockingMethod(
com.google.protobuf.Descriptors.MethodDescriptor method,
com.google.protobuf.RpcController controller,
com.google.protobuf.Message request)
throws com.google.protobuf.ServiceException {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.callBlockingMethod() given method descriptor for " +
"wrong service type.");
}
switch(method.getIndex()) {
case 0:
return impl.ping(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request);
case 1:
return impl.echo(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto)request);
case 2:
return impl.error(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request);
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getRequestPrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getRequestPrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getResponsePrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getResponsePrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
};
}
public abstract void ping(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
public abstract void echo(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto> done);
public abstract void error(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done);
public static final
com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptor() {
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.getDescriptor().getServices().get(0);
}
public final com.google.protobuf.Descriptors.ServiceDescriptor
getDescriptorForType() {
return getDescriptor();
}
public final void callMethod(
com.google.protobuf.Descriptors.MethodDescriptor method,
com.google.protobuf.RpcController controller,
com.google.protobuf.Message request,
com.google.protobuf.RpcCallback<
com.google.protobuf.Message> done) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.callMethod() given method descriptor for wrong " +
"service type.");
}
switch(method.getIndex()) {
case 0:
this.ping(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request,
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto>specializeCallback(
done));
return;
case 1:
this.echo(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto)request,
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto>specializeCallback(
done));
return;
case 2:
this.error(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request,
com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto>specializeCallback(
done));
return;
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getRequestPrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getRequestPrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public final com.google.protobuf.Message
getResponsePrototype(
com.google.protobuf.Descriptors.MethodDescriptor method) {
if (method.getService() != getDescriptor()) {
throw new java.lang.IllegalArgumentException(
"Service.getResponsePrototype() given method " +
"descriptor for wrong service type.");
}
switch(method.getIndex()) {
case 0:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
case 1:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance();
case 2:
return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance();
default:
throw new java.lang.AssertionError("Can't get here.");
}
}
public static Stub newStub(
com.google.protobuf.RpcChannel channel) {
return new Stub(channel);
}
public static final class Stub extends org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto implements Interface {
private Stub(com.google.protobuf.RpcChannel channel) {
this.channel = channel;
}
private final com.google.protobuf.RpcChannel channel;
public com.google.protobuf.RpcChannel getChannel() {
return channel;
}
public void ping(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done) {
channel.callMethod(
getDescriptor().getMethods().get(0),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.class,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance()));
}
public void echo(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto> done) {
channel.callMethod(
getDescriptor().getMethods().get(1),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.class,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance()));
}
public void error(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request,
com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto> done) {
channel.callMethod(
getDescriptor().getMethods().get(2),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(),
com.google.protobuf.RpcUtil.generalizeCallback(
done,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.class,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance()));
}
}
public static BlockingInterface newBlockingStub(
com.google.protobuf.BlockingRpcChannel channel) {
return new BlockingStub(channel);
}
public interface BlockingInterface {
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto ping(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request)
throws com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto echo(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto request)
throws com.google.protobuf.ServiceException;
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto error(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request)
throws com.google.protobuf.ServiceException;
}
private static final class BlockingStub implements BlockingInterface {
private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
this.channel = channel;
}
private final com.google.protobuf.BlockingRpcChannel channel;
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto ping(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request)
throws com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto) channel.callBlockingMethod(
getDescriptor().getMethods().get(0),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance());
}
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto echo(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto request)
throws com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto) channel.callBlockingMethod(
getDescriptor().getMethods().get(1),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance());
}
public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto error(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request)
throws com.google.protobuf.ServiceException {
return (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto) channel.callBlockingMethod(
getDescriptor().getMethods().get(2),
controller,
request,
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance());
}
}
}
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\026test_rpc_service.proto\032\ntest.proto2\250\001\n" +
"\024TestProtobufRpcProto\022/\n\004ping\022\022.EmptyReq" +
"uestProto\032\023.EmptyResponseProto\022-\n\004echo\022\021" +
".EchoRequestProto\032\022.EchoResponseProto\0220\n" +
"\005error\022\022.EmptyRequestProto\032\023.EmptyRespon" +
"seProtoBL\n.org.apache.hadoop.hbase.ipc.p" +
"rotobuf.generatedB\024TestRpcServiceProtos\210" +
"\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.getDescriptor(),
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
option java_outer_classname = "TestProtos";
option java_generate_equals_and_hash = true;
message EmptyRequestProto {
}
message EmptyResponseProto {
}
message EchoRequestProto {
required string message = 1;
}
message EchoResponseProto {
required string message = 1;
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
option java_outer_classname = "TestRpcServiceProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "test.proto";
/**
* A protobuf service for use in tests
*/
service TestProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EchoRequestProto) returns (EchoResponseProto);
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
}