HBASE-5810 HBASE-5620 Convert the client protocol of HRegionInterface to PB addendum

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1327629 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-04-18 18:53:30 +00:00
parent b346e6e26a
commit 5446911066
8 changed files with 96 additions and 118 deletions

View File

@ -20,30 +20,25 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.Objects;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import com.google.protobuf.ServiceException;
import javax.net.SocketFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.*;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
@ -161,14 +156,26 @@ public class SecureRpcEngine implements RpcEngine {
if (logDebug) {
startTime = System.currentTimeMillis();
}
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address,
protocol, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
try {
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address,
protocol, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
} catch (Throwable t) {
// For protobuf protocols, ServiceException is expected
if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
if (t instanceof RemoteException) {
Throwable cause = ((RemoteException)t).unwrapRemoteException();
throw new ServiceException(cause);
}
throw new ServiceException(t);
}
throw t;
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
@ -390,6 +397,9 @@ public class SecureRpcEngine implements RpcEngine {
if (target instanceof IOException) {
throw (IOException)target;
}
if (target instanceof ServiceException) {
throw ProtobufUtil.getRemoteException((ServiceException)target);
}
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;

View File

@ -141,11 +141,10 @@ public class ScannerCallable extends ServerCallable<Result[]> {
}
updateResultsMetrics(rrs);
} catch (IOException e) {
IOException ioe = null;
IOException ioe = e;
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
if (ioe == null) throw new IOException(e);
if (ioe instanceof NotServingRegionException) {
// Throw a DNRE so that we break out of cycle of calling NSRE
// when what we need is to open scanner against new location.

View File

@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.ServiceException;
/**
* Abstract class that implements {@link Callable}. Implementation stipulates
* return type and method we actually invoke on remote Server. Usually
@ -231,7 +233,13 @@ public abstract class ServerCallable<T> implements Callable<T> {
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
if (t instanceof DoNotRetryIOException) {
if (t instanceof ServiceException) {
ServiceException se = (ServiceException)t;
Throwable cause = se.getCause();
if (cause != null && cause instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)cause;
}
} else if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)t;
}
return t;

View File

@ -88,11 +88,11 @@ public class ExecRPCInvoker implements InvocationHandler {
return new ExecResult(regionName, value);
}
};
ExecResult result = callable.withRetries();
this.regionName = result.getRegionName();
LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
", value="+result.getValue());
return result.getValue();
ExecResult result = callable.withRetries();
this.regionName = result.getRegionName();
LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
", value="+result.getValue());
return result.getValue();
}
return null;

View File

@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@ -138,7 +138,6 @@ public class HBaseRPC {
/**
* A version mismatch for the RPC protocol.
*/
@SuppressWarnings("serial")
public static class VersionMismatch extends IOException {
private static final long serialVersionUID = 0;
private String interfaceName;
@ -345,31 +344,6 @@ public class HBaseRPC {
}
}
/**
* Expert: Make multiple, parallel calls to a set of servers.
*
* @param method method to invoke
* @param params array of parameters
* @param addrs array of addresses
* @param conf configuration
* @return values
* @throws IOException e
* @deprecated Instead of calling statically, use
* {@link HBaseRPC#getProtocolEngine(Class, org.apache.hadoop.conf.Configuration)}
* to obtain an {@link RpcEngine} instance and then use
* {@link RpcEngine#call(java.lang.reflect.Method, Object[][], java.net.InetSocketAddress[], Class, org.apache.hadoop.hbase.security.User, org.apache.hadoop.conf.Configuration)}
*/
@Deprecated
public static Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
Class<? extends VersionedProtocol> protocol,
User ticket,
Configuration conf)
throws IOException, InterruptedException {
return getProtocolEngine(protocol, conf)
.call(method, params, addrs, protocol, ticket, conf);
}
/**
* Construct a server for a protocol implementation instance listening on a
* port and address.

View File

@ -25,12 +25,15 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.protobuf.AdminProtocol;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.io.VersionMismatchException;
@ -58,6 +61,15 @@ public class Invocation extends VersionedWritable implements Configurable {
Long.valueOf(ClientProtocol.VERSION));
}
// For protobuf protocols, which use ServiceException, instead of IOException
protected static final Set<Class<?>>
PROTOBUF_PROTOCOLS = new HashSet<Class<?>>();
static {
PROTOBUF_PROTOCOLS.add(ClientProtocol.class);
PROTOBUF_PROTOCOLS.add(AdminProtocol.class);
}
private static byte RPC_VERSION = 1;
public Invocation() {}

View File

@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.lang.reflect.Method;
import java.io.IOException;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
@ -42,17 +41,10 @@ interface RpcEngine {
/** Stop this proxy. */
void stopProxy(VersionedProtocol proxy);
/** Expert: Make multiple, parallel calls to a set of servers. */
Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
Class<? extends VersionedProtocol> protocol,
User ticket, Configuration conf)
throws IOException, InterruptedException;
/** Construct a server for a protocol implementation instance. */
RpcServer getServer(Class<? extends VersionedProtocol> protocol, Object instance,
Class<?>[] ifaces, String bindAddress,
int port, int numHandlers, int metaHandlerCount,
boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException;
}

View File

@ -22,9 +22,9 @@ package org.apache.hadoop.hbase.ipc;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.io.*;
@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Objects;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.*;
@ -59,7 +59,7 @@ import com.google.protobuf.ServiceException;
@InterfaceAudience.Private
class WritableRpcEngine implements RpcEngine {
// LOG is NOT in hbase subpackage intentionally so that the default HBase
// DEBUG log level does NOT emit RPC-level logging.
// DEBUG log level does NOT emit RPC-level logging.
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
/* Cache a client using its socket factory as the hash key */
@ -95,17 +95,6 @@ class WritableRpcEngine implements RpcEngine {
return client;
}
/**
* Construct & cache an IPC client with the default SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @return an IPC client
*/
protected synchronized HBaseClient getClient(Configuration conf) {
return getClient(conf, SocketFactory.getDefault());
}
/**
* Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
@ -152,15 +141,27 @@ class WritableRpcEngine implements RpcEngine {
startTime = System.currentTimeMillis();
}
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address,
protocol, ticket, rpcTimeout);
if (logDebug) {
// FIGURE HOW TO TURN THIS OFF!
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
try {
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address,
protocol, ticket, rpcTimeout);
if (logDebug) {
// FIGURE HOW TO TURN THIS OFF!
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
} catch (Throwable t) {
// For protobuf protocols, ServiceException is expected
if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
if (t instanceof RemoteException) {
Throwable cause = ((RemoteException)t).unwrapRemoteException();
throw new ServiceException(cause);
}
throw new ServiceException(t);
}
throw t;
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
@ -185,11 +186,25 @@ class WritableRpcEngine implements RpcEngine {
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
if (proxy instanceof VersionedProtocol) {
long serverVersion = ((VersionedProtocol)proxy)
.getProtocolVersion(protocol.getName(), clientVersion);
if (serverVersion != clientVersion) {
throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
serverVersion);
try {
long serverVersion = ((VersionedProtocol)proxy)
.getProtocolVersion(protocol.getName(), clientVersion);
if (serverVersion != clientVersion) {
throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
serverVersion);
}
} catch (Throwable t) {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
if (t instanceof ServiceException) {
throw ProtobufUtil.getRemoteException((ServiceException)t);
}
if (!(t instanceof IOException)) {
LOG.error("Unexpected throwable object ", t);
throw new IOException(t);
}
throw (IOException)t;
}
}
return proxy;
@ -205,38 +220,6 @@ class WritableRpcEngine implements RpcEngine {
}
}
/** Expert: Make multiple, parallel calls to a set of servers. */
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
Class<? extends VersionedProtocol> protocol,
User ticket, Configuration conf)
throws IOException, InterruptedException {
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
invocations[i] = new Invocation(method, params[i]);
HBaseClient client = CLIENTS.getClient(conf);
try {
Writable[] wrappedValues =
client.call(invocations, addrs, protocol, ticket);
if (method.getReturnType() == Void.TYPE) {
return null;
}
Object[] values =
(Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
for (int i = 0; i < values.length; i++)
if (wrappedValues[i] != null)
values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
return values;
} finally {
CLIENTS.stopClient(client);
}
}
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public Server getServer(Class<? extends VersionedProtocol> protocol,