From 544691106612562b9fdd111f59f9d7cf03725658 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 18 Apr 2012 18:53:30 +0000 Subject: [PATCH] HBASE-5810 HBASE-5620 Convert the client protocol of HRegionInterface to PB addendum git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1327629 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/ipc/SecureRpcEngine.java | 42 +++++--- .../hadoop/hbase/client/ScannerCallable.java | 3 +- .../hadoop/hbase/client/ServerCallable.java | 10 +- .../hadoop/hbase/ipc/ExecRPCInvoker.java | 10 +- .../org/apache/hadoop/hbase/ipc/HBaseRPC.java | 28 +---- .../apache/hadoop/hbase/ipc/Invocation.java | 12 +++ .../apache/hadoop/hbase/ipc/RpcEngine.java | 8 -- .../hadoop/hbase/ipc/WritableRpcEngine.java | 101 ++++++++---------- 8 files changed, 96 insertions(+), 118 deletions(-) diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java index 8219bea7993..39d20e6f941 100644 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java +++ b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java @@ -20,30 +20,25 @@ package org.apache.hadoop.hbase.ipc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Objects; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; -import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import com.google.protobuf.ServiceException; + import javax.net.SocketFactory; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.*; -import java.net.ConnectException; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; @@ -161,14 +156,26 @@ public class SecureRpcEngine implements RpcEngine { if (logDebug) { startTime = System.currentTimeMillis(); } - HbaseObjectWritable value = (HbaseObjectWritable) - client.call(new Invocation(method, args), address, - protocol, ticket, rpcTimeout); - if (logDebug) { - long callTime = System.currentTimeMillis() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); + try { + HbaseObjectWritable value = (HbaseObjectWritable) + client.call(new Invocation(method, args), address, + protocol, ticket, rpcTimeout); + if (logDebug) { + long callTime = System.currentTimeMillis() - startTime; + LOG.debug("Call: " + method.getName() + " " + callTime); + } + return value.get(); + } catch (Throwable t) { + // For protobuf protocols, ServiceException is expected + if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) { + if (t instanceof RemoteException) { + Throwable cause = ((RemoteException)t).unwrapRemoteException(); + throw new ServiceException(cause); + } + throw new ServiceException(t); + } + throw t; } - return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ @@ -390,6 +397,9 @@ public class SecureRpcEngine implements RpcEngine { if (target instanceof IOException) { throw (IOException)target; } + if (target instanceof ServiceException) { + throw ProtobufUtil.getRemoteException((ServiceException)target); + } IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); throw ioe; diff --git a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index fe80fcf424f..46b1c5659b6 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -141,11 +141,10 @@ public class ScannerCallable extends ServerCallable { } updateResultsMetrics(rrs); } catch (IOException e) { - IOException ioe = null; + IOException ioe = e; if (e instanceof RemoteException) { ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); } - if (ioe == null) throw new IOException(e); if (ioe instanceof NotServingRegionException) { // Throw a DNRE so that we break out of cycle of calling NSRE // when what we need is to open scanner against new location. diff --git a/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java index 2a9d86ed5d5..cd4cccbf6f9 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.protobuf.ClientProtocol; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.ipc.RemoteException; +import com.google.protobuf.ServiceException; + /** * Abstract class that implements {@link Callable}. Implementation stipulates * return type and method we actually invoke on remote Server. Usually @@ -231,7 +233,13 @@ public abstract class ServerCallable implements Callable { if (t instanceof RemoteException) { t = ((RemoteException)t).unwrapRemoteException(); } - if (t instanceof DoNotRetryIOException) { + if (t instanceof ServiceException) { + ServiceException se = (ServiceException)t; + Throwable cause = se.getCause(); + if (cause != null && cause instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)cause; + } + } else if (t instanceof DoNotRetryIOException) { throw (DoNotRetryIOException)t; } return t; diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java b/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java index d71e97ed84d..2fc4a154ba2 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java @@ -88,11 +88,11 @@ public class ExecRPCInvoker implements InvocationHandler { return new ExecResult(regionName, value); } }; - ExecResult result = callable.withRetries(); - this.regionName = result.getRegionName(); - LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) + - ", value="+result.getValue()); - return result.getValue(); + ExecResult result = callable.withRetries(); + this.regionName = result.getRegionName(); + LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) + + ", value="+result.getValue()); + return result.getValue(); } return null; diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index 3a3a79f5026..fec88addc1b 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.io.Writable; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.ReflectionUtils; + import javax.net.SocketFactory; import java.io.IOException; -import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -138,7 +138,6 @@ public class HBaseRPC { /** * A version mismatch for the RPC protocol. */ - @SuppressWarnings("serial") public static class VersionMismatch extends IOException { private static final long serialVersionUID = 0; private String interfaceName; @@ -345,31 +344,6 @@ public class HBaseRPC { } } - /** - * Expert: Make multiple, parallel calls to a set of servers. - * - * @param method method to invoke - * @param params array of parameters - * @param addrs array of addresses - * @param conf configuration - * @return values - * @throws IOException e - * @deprecated Instead of calling statically, use - * {@link HBaseRPC#getProtocolEngine(Class, org.apache.hadoop.conf.Configuration)} - * to obtain an {@link RpcEngine} instance and then use - * {@link RpcEngine#call(java.lang.reflect.Method, Object[][], java.net.InetSocketAddress[], Class, org.apache.hadoop.hbase.security.User, org.apache.hadoop.conf.Configuration)} - */ - @Deprecated - public static Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, - Class protocol, - User ticket, - Configuration conf) - throws IOException, InterruptedException { - return getProtocolEngine(protocol, conf) - .call(method, params, addrs, protocol, ticket, conf); - } - /** * Construct a server for a protocol implementation instance listening on a * port and address. diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java b/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java index b7afa58db0c..57c94435fec 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java @@ -25,12 +25,15 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.protobuf.AdminProtocol; import org.apache.hadoop.hbase.protobuf.ClientProtocol; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.io.VersionMismatchException; @@ -58,6 +61,15 @@ public class Invocation extends VersionedWritable implements Configurable { Long.valueOf(ClientProtocol.VERSION)); } + // For protobuf protocols, which use ServiceException, instead of IOException + protected static final Set> + PROTOBUF_PROTOCOLS = new HashSet>(); + + static { + PROTOBUF_PROTOCOLS.add(ClientProtocol.class); + PROTOBUF_PROTOCOLS.add(AdminProtocol.class); + } + private static byte RPC_VERSION = 1; public Invocation() {} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java b/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java index dd0a1bcd2e7..52d179db1df 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java @@ -19,7 +19,6 @@ */ package org.apache.hadoop.hbase.ipc; -import java.lang.reflect.Method; import java.io.IOException; import java.net.InetSocketAddress; import javax.net.SocketFactory; @@ -42,17 +41,10 @@ interface RpcEngine { /** Stop this proxy. */ void stopProxy(VersionedProtocol proxy); - /** Expert: Make multiple, parallel calls to a set of servers. */ - Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, - Class protocol, - User ticket, Configuration conf) - throws IOException, InterruptedException; - /** Construct a server for a protocol implementation instance. */ RpcServer getServer(Class protocol, Object instance, Class[] ifaces, String bindAddress, int port, int numHandlers, int metaHandlerCount, boolean verbose, Configuration conf, int highPriorityLevel) throws IOException; - } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java b/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java index 9f159f26be1..09601b8adaa 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java @@ -22,9 +22,9 @@ package org.apache.hadoop.hbase.ipc; import java.lang.reflect.Proxy; import java.lang.reflect.Method; -import java.lang.reflect.Array; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; import java.io.*; @@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Objects; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.*; @@ -59,7 +59,7 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Private class WritableRpcEngine implements RpcEngine { // LOG is NOT in hbase subpackage intentionally so that the default HBase - // DEBUG log level does NOT emit RPC-level logging. + // DEBUG log level does NOT emit RPC-level logging. private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine"); /* Cache a client using its socket factory as the hash key */ @@ -95,17 +95,6 @@ class WritableRpcEngine implements RpcEngine { return client; } - /** - * Construct & cache an IPC client with the default SocketFactory - * if no cached client exists. - * - * @param conf Configuration - * @return an IPC client - */ - protected synchronized HBaseClient getClient(Configuration conf) { - return getClient(conf, SocketFactory.getDefault()); - } - /** * Stop a RPC client connection * A RPC client is closed only when its reference count becomes zero. @@ -152,15 +141,27 @@ class WritableRpcEngine implements RpcEngine { startTime = System.currentTimeMillis(); } - HbaseObjectWritable value = (HbaseObjectWritable) - client.call(new Invocation(method, args), address, - protocol, ticket, rpcTimeout); - if (logDebug) { - // FIGURE HOW TO TURN THIS OFF! - long callTime = System.currentTimeMillis() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); + try { + HbaseObjectWritable value = (HbaseObjectWritable) + client.call(new Invocation(method, args), address, + protocol, ticket, rpcTimeout); + if (logDebug) { + // FIGURE HOW TO TURN THIS OFF! + long callTime = System.currentTimeMillis() - startTime; + LOG.debug("Call: " + method.getName() + " " + callTime); + } + return value.get(); + } catch (Throwable t) { + // For protobuf protocols, ServiceException is expected + if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) { + if (t instanceof RemoteException) { + Throwable cause = ((RemoteException)t).unwrapRemoteException(); + throw new ServiceException(cause); + } + throw new ServiceException(t); + } + throw t; } - return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ @@ -185,11 +186,25 @@ class WritableRpcEngine implements RpcEngine { protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); if (proxy instanceof VersionedProtocol) { - long serverVersion = ((VersionedProtocol)proxy) - .getProtocolVersion(protocol.getName(), clientVersion); - if (serverVersion != clientVersion) { - throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion, - serverVersion); + try { + long serverVersion = ((VersionedProtocol)proxy) + .getProtocolVersion(protocol.getName(), clientVersion); + if (serverVersion != clientVersion) { + throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion, + serverVersion); + } + } catch (Throwable t) { + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } + if (t instanceof ServiceException) { + throw ProtobufUtil.getRemoteException((ServiceException)t); + } + if (!(t instanceof IOException)) { + LOG.error("Unexpected throwable object ", t); + throw new IOException(t); + } + throw (IOException)t; } } return proxy; @@ -205,38 +220,6 @@ class WritableRpcEngine implements RpcEngine { } } - - /** Expert: Make multiple, parallel calls to a set of servers. */ - public Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, - Class protocol, - User ticket, Configuration conf) - throws IOException, InterruptedException { - - Invocation[] invocations = new Invocation[params.length]; - for (int i = 0; i < params.length; i++) - invocations[i] = new Invocation(method, params[i]); - HBaseClient client = CLIENTS.getClient(conf); - try { - Writable[] wrappedValues = - client.call(invocations, addrs, protocol, ticket); - - if (method.getReturnType() == Void.TYPE) { - return null; - } - - Object[] values = - (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length); - for (int i = 0; i < values.length; i++) - if (wrappedValues[i] != null) - values[i] = ((HbaseObjectWritable)wrappedValues[i]).get(); - - return values; - } finally { - CLIENTS.stopClient(client); - } - } - /** Construct a server for a protocol implementation instance listening on a * port and address. */ public Server getServer(Class protocol,