HBASE-7460 Cleanup RPC client connection layers

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1435414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2013-01-18 23:32:39 +00:00
parent 7409c4ff9d
commit da18540ea3
25 changed files with 275 additions and 534 deletions

View File

@ -283,7 +283,7 @@ org.apache.hadoop.hbase.HBaseConfiguration;
<%def userTables> <%def userTables>
<%java> <%java>
HTableDescriptor[] tables = admin.listTables(); HTableDescriptor[] tables = admin.listTables();
HConnectionManager.deleteConnection(admin.getConfiguration(), false); HConnectionManager.deleteConnection(admin.getConfiguration());
</%java> </%java>
<%if (tables != null && tables.length > 0)%> <%if (tables != null && tables.length > 0)%>
<table class="table table-striped"> <table class="table table-striped">

View File

@ -75,6 +75,8 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcClientEngine;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
@ -117,7 +119,7 @@ import com.google.protobuf.ServiceException;
* <p>But sharing connections * <p>But sharing connections
* makes clean up of {@link HConnection} instances a little awkward. Currently, * makes clean up of {@link HConnection} instances a little awkward. Currently,
* clients cleanup by calling * clients cleanup by calling
* {@link #deleteConnection(Configuration, boolean)}. This will shutdown the * {@link #deleteConnection(Configuration)}. This will shutdown the
* zookeeper connection the HConnection was using and clean up all * zookeeper connection the HConnection was using and clean up all
* HConnection resources as well as stopping proxies to servers out on the * HConnection resources as well as stopping proxies to servers out on the
* cluster. Not running the cleanup will not end the world; it'll * cluster. Not running the cleanup will not end the world; it'll
@ -138,7 +140,7 @@ import com.google.protobuf.ServiceException;
* } * }
* </pre> * </pre>
* <p>Cleanup used to be done inside in a shutdown hook. On startup we'd * <p>Cleanup used to be done inside in a shutdown hook. On startup we'd
* register a shutdown hook that called {@link #deleteAllConnections(boolean)} * register a shutdown hook that called {@link #deleteAllConnections()}
* on its way out but the order in which shutdown hooks run is not defined so * on its way out but the order in which shutdown hooks run is not defined so
* were problematic for clients of HConnection that wanted to register their * were problematic for clients of HConnection that wanted to register their
* own shutdown hooks so we removed ours though this shifts the onus for * own shutdown hooks so we removed ours though this shifts the onus for
@ -212,7 +214,7 @@ public class HConnectionManager {
connection = new HConnectionImplementation(conf, true); connection = new HConnectionImplementation(conf, true);
HBASE_INSTANCES.put(connectionKey, connection); HBASE_INSTANCES.put(connectionKey, connection);
} else if (connection.isClosed()) { } else if (connection.isClosed()) {
HConnectionManager.deleteConnection(connectionKey, true, true); HConnectionManager.deleteConnection(connectionKey, true);
connection = new HConnectionImplementation(conf, true); connection = new HConnectionImplementation(conf, true);
HBASE_INSTANCES.put(connectionKey, connection); HBASE_INSTANCES.put(connectionKey, connection);
} }
@ -244,14 +246,9 @@ public class HConnectionManager {
* @param conf * @param conf
* configuration whose identity is used to find {@link HConnection} * configuration whose identity is used to find {@link HConnection}
* instance. * instance.
* @param stopProxy
* Shuts down all the proxy's put up to cluster members including to
* cluster HMaster. Calls
* {@link HBaseClientRPC#stopProxy(IpcProtocol)}
* .
*/ */
public static void deleteConnection(Configuration conf, boolean stopProxy) { public static void deleteConnection(Configuration conf) {
deleteConnection(new HConnectionKey(conf), stopProxy, false); deleteConnection(new HConnectionKey(conf), false);
} }
/** /**
@ -262,40 +259,37 @@ public class HConnectionManager {
* @param connection * @param connection
*/ */
public static void deleteStaleConnection(HConnection connection) { public static void deleteStaleConnection(HConnection connection) {
deleteConnection(connection, true, true); deleteConnection(connection, true);
} }
/** /**
* Delete information for all connections. * Delete information for all connections.
* @param stopProxy stop the proxy as well
* @throws IOException * @throws IOException
*/ */
public static void deleteAllConnections(boolean stopProxy) { public static void deleteAllConnections() {
synchronized (HBASE_INSTANCES) { synchronized (HBASE_INSTANCES) {
Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>(); Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
connectionKeys.addAll(HBASE_INSTANCES.keySet()); connectionKeys.addAll(HBASE_INSTANCES.keySet());
for (HConnectionKey connectionKey : connectionKeys) { for (HConnectionKey connectionKey : connectionKeys) {
deleteConnection(connectionKey, stopProxy, false); deleteConnection(connectionKey, false);
} }
HBASE_INSTANCES.clear(); HBASE_INSTANCES.clear();
} }
} }
private static void deleteConnection(HConnection connection, boolean stopProxy, private static void deleteConnection(HConnection connection, boolean staleConnection) {
boolean staleConnection) {
synchronized (HBASE_INSTANCES) { synchronized (HBASE_INSTANCES) {
for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
.entrySet()) { .entrySet()) {
if (connectionEntry.getValue() == connection) { if (connectionEntry.getValue() == connection) {
deleteConnection(connectionEntry.getKey(), stopProxy, staleConnection); deleteConnection(connectionEntry.getKey(), staleConnection);
break; break;
} }
} }
} }
} }
private static void deleteConnection(HConnectionKey connectionKey, private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
boolean stopProxy, boolean staleConnection) {
synchronized (HBASE_INSTANCES) { synchronized (HBASE_INSTANCES) {
HConnectionImplementation connection = HBASE_INSTANCES HConnectionImplementation connection = HBASE_INSTANCES
.get(connectionKey); .get(connectionKey);
@ -303,11 +297,9 @@ public class HConnectionManager {
connection.decCount(); connection.decCount();
if (connection.isZeroReference() || staleConnection) { if (connection.isZeroReference() || staleConnection) {
HBASE_INSTANCES.remove(connectionKey); HBASE_INSTANCES.remove(connectionKey);
connection.close(stopProxy); connection.internalClose();
} else if (stopProxy) {
connection.stopProxyOnClose(stopProxy);
} }
}else { } else {
LOG.error("Connection not found in the list, can't delete it "+ LOG.error("Connection not found in the list, can't delete it "+
"(connection key="+connectionKey+"). May be the key was modified?"); "(connection key="+connectionKey+"). May be the key was modified?");
} }
@ -549,6 +541,9 @@ public class HConnectionManager {
private final Configuration conf; private final Configuration conf;
// client RPC
private RpcClientEngine rpcEngine;
// Known region ServerName.toString() -> RegionClient/Admin // Known region ServerName.toString() -> RegionClient/Admin
private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers = private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
new ConcurrentHashMap<String, Map<String, IpcProtocol>>(); new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
@ -575,7 +570,6 @@ public class HConnectionManager {
private final Set<Integer> regionCachePrefetchDisabledTables = private final Set<Integer> regionCachePrefetchDisabledTables =
new CopyOnWriteArraySet<Integer>(); new CopyOnWriteArraySet<Integer>();
private boolean stopProxy;
private int refCount; private int refCount;
// indicates whether this connection's life cycle is managed (by us) // indicates whether this connection's life cycle is managed (by us)
@ -589,6 +583,9 @@ public class HConnectionManager {
throws ZooKeeperConnectionException { throws ZooKeeperConnectionException {
this.conf = conf; this.conf = conf;
this.managed = managed; this.managed = managed;
// ProtobufRpcClientEngine is the main RpcClientEngine implementation,
// but we maintain access through an interface to allow overriding for tests
this.rpcEngine = new ProtobufRpcClientEngine(conf);
String adminClassName = conf.get(REGION_PROTOCOL_CLASS, String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
DEFAULT_ADMIN_PROTOCOL_CLASS); DEFAULT_ADMIN_PROTOCOL_CLASS);
this.closed = false; this.closed = false;
@ -716,7 +713,7 @@ public class HConnectionManager {
InetSocketAddress isa = InetSocketAddress isa =
new InetSocketAddress(sn.getHostname(), sn.getPort()); new InetSocketAddress(sn.getHostname(), sn.getPort());
MasterProtocol tryMaster = (MasterProtocol)HBaseClientRPC.getProxy( MasterProtocol tryMaster = rpcEngine.getProxy(
masterProtocolState.protocolClass, masterProtocolState.protocolClass,
isa, this.conf, this.rpcTimeout); isa, this.conf, this.rpcTimeout);
@ -724,7 +721,6 @@ public class HConnectionManager {
null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) { null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
return tryMaster; return tryMaster;
} else { } else {
HBaseClientRPC.stopProxy(tryMaster);
String msg = "Can create a proxy to master, but it is not running"; String msg = "Can create a proxy to master, but it is not running";
LOG.info(msg); LOG.info(msg);
throw new MasterNotRunningException(msg); throw new MasterNotRunningException(msg);
@ -897,7 +893,7 @@ public class HConnectionManager {
@Override @Override
public HRegionLocation locateRegion(final byte[] regionName) throws IOException { public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
return locateRegion(HRegionInfo.getTableName(regionName), return locateRegion(HRegionInfo.getTableName(regionName),
HRegionInfo.getStartKey(regionName), false, true); HRegionInfo.getStartKey(regionName), false, true);
} }
@Override @Override
@ -1364,7 +1360,6 @@ public class HConnectionManager {
* @param hostname * @param hostname
* @param port * @param port
* @param protocolClass * @param protocolClass
* @param version
* @return Proxy. * @return Proxy.
* @throws IOException * @throws IOException
*/ */
@ -1397,8 +1392,7 @@ public class HConnectionManager {
// Only create isa when we need to. // Only create isa when we need to.
InetSocketAddress address = new InetSocketAddress(hostname, port); InetSocketAddress address = new InetSocketAddress(hostname, port);
// definitely a cache miss. establish an RPC for this RS // definitely a cache miss. establish an RPC for this RS
server = HBaseClientRPC.waitForProxy( server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf,
protocolClass, address, this.conf,
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
protocols.put(protocol, server); protocols.put(protocol, server);
} catch (RemoteException e) { } catch (RemoteException e) {
@ -1611,9 +1605,6 @@ public class HConnectionManager {
throws MasterNotRunningException { throws MasterNotRunningException {
synchronized (masterAndZKLock) { synchronized (masterAndZKLock) {
if (!isKeepAliveMasterConnectedAndRunning(protocolState)) { if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
if (protocolState.protocol != null) {
HBaseClientRPC.stopProxy(protocolState.protocol);
}
protocolState.protocol = null; protocolState.protocol = null;
protocolState.protocol = createMasterWithRetries(protocolState); protocolState.protocol = createMasterWithRetries(protocolState);
} }
@ -1688,7 +1679,6 @@ public class HConnectionManager {
private void closeMasterProtocol(MasterProtocolState protocolState) { private void closeMasterProtocol(MasterProtocolState protocolState) {
if (protocolState.protocol != null){ if (protocolState.protocol != null){
LOG.info("Closing master protocol: " + protocolState.protocolClass.getName()); LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
HBaseClientRPC.stopProxy(protocolState.protocol);
protocolState.protocol = null; protocolState.protocol = null;
} }
protocolState.userCount = 0; protocolState.userCount = 0;
@ -2276,10 +2266,6 @@ public class HConnectionManager {
} }
} }
public void stopProxyOnClose(boolean stopProxy) {
this.stopProxy = stopProxy;
}
/** /**
* Increment this client's reference count. * Increment this client's reference count.
*/ */
@ -2305,21 +2291,15 @@ public class HConnectionManager {
return refCount == 0; return refCount == 0;
} }
void close(boolean stopProxy) { void internalClose() {
if (this.closed) { if (this.closed) {
return; return;
} }
delayedClosing.stop("Closing connection"); delayedClosing.stop("Closing connection");
if (stopProxy) { closeMaster();
closeMaster();
for (Map<String, IpcProtocol> i : servers.values()) {
for (IpcProtocol server: i.values()) {
HBaseClientRPC.stopProxy(server);
}
}
}
closeZooKeeperWatcher(); closeZooKeeperWatcher();
this.servers.clear(); this.servers.clear();
this.rpcEngine.close();
this.closed = true; this.closed = true;
} }
@ -2329,10 +2309,10 @@ public class HConnectionManager {
if (aborted) { if (aborted) {
HConnectionManager.deleteStaleConnection(this); HConnectionManager.deleteStaleConnection(this);
} else { } else {
HConnectionManager.deleteConnection(this, stopProxy, false); HConnectionManager.deleteConnection(this, false);
} }
} else { } else {
close(true); internalClose();
} }
} }
@ -2419,6 +2399,14 @@ public class HConnectionManager {
} }
throw new TableNotFoundException(Bytes.toString(tableName)); throw new TableNotFoundException(Bytes.toString(tableName));
} }
/**
* Override the RpcClientEngine implementation used by this connection.
* <strong>FOR TESTING PURPOSES ONLY!</strong>
*/
void setRpcEngine(RpcClientEngine engine) {
this.rpcEngine = engine;
}
} }
/** /**

View File

@ -1,102 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
/**
* Cache a client using its socket factory as the hash key.
* Enables reuse/sharing of clients on a per SocketFactory basis. A client
* establishes certain configuration dependent characteristics like timeouts,
* tcp-keepalive (true or false), etc. For more details on the characteristics,
* look at {@link HBaseClient#HBaseClient(Configuration, SocketFactory)}
* Creation of dynamic proxies to protocols creates the clients (and increments
* reference count once created), and stopping of the proxies leads to clearing
* out references and when the reference drops to zero, the cache mapping is
* cleared.
*/
class ClientCache {
private Map<SocketFactory, HBaseClient> clients =
new HashMap<SocketFactory, HBaseClient>();
protected ClientCache() {}
/**
* Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
*
* @param conf Configuration
* @param factory socket factory
* @return an IPC client
*/
@SuppressWarnings("unchecked")
protected synchronized HBaseClient getClient(Configuration conf, SocketFactory factory) {
HBaseClient client = clients.get(factory);
if (client == null) {
Class<? extends HBaseClient> hbaseClientClass = (Class<? extends HBaseClient>) conf
.getClass(HConstants.HBASECLIENT_IMPL, HBaseClient.class);
// Make an hbase client instead of hadoop Client.
try {
Constructor<? extends HBaseClient> cst = hbaseClientClass.getConstructor(
Configuration.class, SocketFactory.class);
client = cst.newInstance(conf, factory);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException("No matching constructor in "+hbaseClientClass.getName(), e);
}
clients.put(factory, client);
} else {
client.incCount();
}
return client;
}
/**
* Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
* @param client client to stop
*/
protected void stopClient(HBaseClient client) {
synchronized (this) {
client.decCount();
if (client.isZeroReference()) {
clients.remove(client.getSocketFactory());
}
}
if (client.isZeroReference()) {
client.stop();
}
}
}

View File

@ -126,7 +126,6 @@ public class HBaseClient {
protected FailedServers failedServers; protected FailedServers failedServers;
protected final SocketFactory socketFactory; // how to create sockets protected final SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
protected String clusterId; protected String clusterId;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@ -235,31 +234,6 @@ public class HBaseClient {
return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
} }
/**
* Increment this client's reference count
*
*/
synchronized void incCount() {
refCount++;
}
/**
* Decrement this client's reference count
*
*/
synchronized void decCount() {
refCount--;
}
/**
* Return if this client has no reference
*
* @return true if this client has no reference; false otherwise
*/
synchronized boolean isZeroReference() {
return refCount==0;
}
/** A call waiting for a value. */ /** A call waiting for a value. */
protected class Call { protected class Call {
final int id; // call id final int id; // call id

View File

@ -48,22 +48,6 @@ public class HBaseClientRPC {
protected static final Log LOG = protected static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC"); LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC");
/**
* Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine}
* implementation to load to handle connection protocols. Handlers for individual
* protocols can be configured using {@code "hbase.rpc.client.engine." +
* protocol.class.name}.
*/
public static final String RPC_ENGINE_PROP = "hbase.rpc.client.engine";
// cache of RpcEngines by protocol
private static final Map<Class<? extends IpcProtocol>, RpcClientEngine> PROTOCOL_ENGINES =
new HashMap<Class<? extends IpcProtocol>, RpcClientEngine>();
// Track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class<?>, RpcClientEngine> PROXY_ENGINES =
new HashMap<Class<?>, RpcClientEngine>();
// thread-specific RPC timeout, which may override that of RpcEngine // thread-specific RPC timeout, which may override that of RpcEngine
private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() { private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
@Override @Override
@ -72,41 +56,6 @@ public class HBaseClientRPC {
} }
}; };
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class<? extends IpcProtocol> protocol, Class<? extends RpcClientEngine> engine) {
conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine,
RpcClientEngine.class);
}
// return the RpcEngine configured to handle a protocol
static synchronized RpcClientEngine getProtocolEngine(
Class<? extends IpcProtocol> protocol, Configuration conf) {
RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
// check for a configured default engine
Class<?> defaultEngine =
conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class);
// check for a per interface override
Class<?> impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(),
defaultEngine);
LOG.debug("Using " + impl.getName() + " for " + protocol.getName());
engine = (RpcClientEngine) ReflectionUtils.newInstance(impl, conf);
if (protocol.isInterface()) {
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), protocol),
engine);
}
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
// return the RpcEngine that handles a proxy object
private static synchronized RpcClientEngine getProxyEngine(Object proxy) {
return PROXY_ENGINES.get(proxy.getClass());
}
/** /**
* @param protocol protocol interface * @param protocol protocol interface
* @param addr address of remote service * @param addr address of remote service
@ -117,12 +66,13 @@ public class HBaseClientRPC {
* @return proxy * @return proxy
* @throws java.io.IOException e * @throws java.io.IOException e
*/ */
public static IpcProtocol waitForProxy(Class<? extends IpcProtocol> protocol, public static <T extends IpcProtocol> T waitForProxy(RpcClientEngine engine,
InetSocketAddress addr, Class<T> protocol,
Configuration conf, InetSocketAddress addr,
int maxAttempts, Configuration conf,
int rpcTimeout, int maxAttempts,
long timeout) int rpcTimeout,
long timeout)
throws IOException { throws IOException {
// HBase does limited number of reconnects which is different from hadoop. // HBase does limited number of reconnects which is different from hadoop.
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -130,7 +80,7 @@ public class HBaseClientRPC {
int reconnectAttempts = 0; int reconnectAttempts = 0;
while (true) { while (true) {
try { try {
return getProxy(protocol, addr, conf, rpcTimeout); return engine.getProxy(protocol, addr, conf, rpcTimeout);
} catch (SocketTimeoutException te) { } catch (SocketTimeoutException te) {
LOG.info("Problem connecting to server: " + addr); LOG.info("Problem connecting to server: " + addr);
ioe = te; ioe = te;
@ -194,77 +144,6 @@ public class HBaseClientRPC {
} }
} }
/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
*
* @param protocol interface
* @param addr remote address
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws java.io.IOException e
*/
public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
return getProxy(protocol, addr, User.getCurrent(), conf, factory, rpcTimeout);
}
/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
*
* @param protocol interface
* @param addr remote address
* @param ticket ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws java.io.IOException e
*/
public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, User ticket,
Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
RpcClientEngine engine = getProtocolEngine(protocol, conf);
IpcProtocol proxy = engine.getProxy(protocol, addr, ticket, conf, factory,
Math.min(rpcTimeout, getRpcTimeout()));
return proxy;
}
/**
* Construct a client-side proxy object with the default SocketFactory
*
* @param protocol interface
* @param addr remote address
* @param conf configuration
* @param rpcTimeout timeout for each RPC
* @return a proxy instance
* @throws java.io.IOException e
*/
public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, Configuration conf, int rpcTimeout)
throws IOException {
return getProxy(protocol, addr, conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
}
/**
* Stop this proxy and release its invoker's resource
*
* @param proxy the proxy to be stopped
*/
public static void stopProxy(IpcProtocol proxy) {
if (proxy != null) {
getProxyEngine(proxy).stopProxy(proxy);
}
}
public static void setRpcTimeout(int t) { public static void setRpcTimeout(int t) {
rpcTimeout.set(t); rpcTimeout.set(t);
} }

View File

@ -45,27 +45,24 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine"); LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine");
ProtobufRpcClientEngine() { protected HBaseClient client;
super();
public ProtobufRpcClientEngine(Configuration conf) {
this.client = new HBaseClient(conf);
} }
protected final static ClientCache CLIENTS = new ClientCache();
@Override @Override
public IpcProtocol getProxy( public <T extends IpcProtocol> T getProxy(
Class<? extends IpcProtocol> protocol, Class<T> protocol, InetSocketAddress addr,
InetSocketAddress addr, User ticket, Configuration conf, Configuration conf, int rpcTimeout) throws IOException {
SocketFactory factory, int rpcTimeout) throws IOException { final Invoker invoker = new Invoker(protocol, addr, User.getCurrent(), rpcTimeout, client);
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, return (T) Proxy.newProxyInstance(
rpcTimeout);
return (IpcProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker); protocol.getClassLoader(), new Class[]{protocol}, invoker);
} }
@Override @Override
public void stopProxy(IpcProtocol proxy) { public void close() {
if (proxy!=null) { this.client.stop();
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
} }
static class Invoker implements InvocationHandler { static class Invoker implements InvocationHandler {
@ -75,16 +72,14 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
private InetSocketAddress address; private InetSocketAddress address;
private User ticket; private User ticket;
private HBaseClient client; private HBaseClient client;
private boolean isClosed = false;
final private int rpcTimeout; final private int rpcTimeout;
public Invoker(Class<? extends IpcProtocol> protocol, public Invoker(Class<? extends IpcProtocol> protocol, InetSocketAddress addr, User ticket,
InetSocketAddress addr, User ticket, Configuration conf, int rpcTimeout, HBaseClient client) throws IOException {
SocketFactory factory, int rpcTimeout) throws IOException {
this.protocol = protocol; this.protocol = protocol;
this.address = addr; this.address = addr;
this.ticket = ticket; this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory); this.client = client;
this.rpcTimeout = rpcTimeout; this.rpcTimeout = rpcTimeout;
} }
@ -157,13 +152,6 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
} }
} }
synchronized protected void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
static Message getReturnProtoType(Method method) throws Exception { static Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) { if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName()); return returnTypes.get(method.getName());

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import javax.net.SocketFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -32,10 +31,9 @@ import java.net.InetSocketAddress;
@InterfaceAudience.Private @InterfaceAudience.Private
public interface RpcClientEngine { public interface RpcClientEngine {
/** Construct a client-side proxy object. */ /** Construct a client-side proxy object. */
IpcProtocol getProxy(Class<? extends IpcProtocol> protocol, <T extends IpcProtocol> T getProxy(Class<T> protocol, InetSocketAddress addr,
InetSocketAddress addr, User ticket, Configuration conf, Configuration conf, int rpcTimeout) throws IOException;
SocketFactory factory, int rpcTimeout) throws IOException;
/** Stop this proxy. */ /** Shutdown this instance */
void stopProxy(IpcProtocol proxy); void close();
} }

View File

@ -107,6 +107,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC; import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController;
@ -328,6 +330,9 @@ public class HRegionServer implements ClientProtocol,
// unit tests. // unit tests.
RpcServer rpcServer; RpcServer rpcServer;
// RPC client for communicating with master
RpcClientEngine rpcClientEngine;
private final InetSocketAddress isa; private final InetSocketAddress isa;
private UncaughtExceptionHandler uncaughtExceptionHandler; private UncaughtExceptionHandler uncaughtExceptionHandler;
@ -841,6 +846,9 @@ public class HRegionServer implements ClientProtocol,
// Create the thread to clean the moved regions list // Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
// Setup RPC client for master communication
rpcClientEngine = new ProtobufRpcClientEngine(conf);
} }
/** /**
@ -989,9 +997,9 @@ public class HRegionServer implements ClientProtocol,
// Make sure the proxy is down. // Make sure the proxy is down.
if (this.hbaseMaster != null) { if (this.hbaseMaster != null) {
HBaseClientRPC.stopProxy(this.hbaseMaster);
this.hbaseMaster = null; this.hbaseMaster = null;
} }
this.rpcClientEngine.close();
this.leases.close(); this.leases.close();
if (!killed) { if (!killed) {
@ -1860,10 +1868,8 @@ public class HRegionServer implements ClientProtocol,
try { try {
// Do initial RPC setup. The final argument indicates that the RPC // Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely. // should retry indefinitely.
master = (RegionServerStatusProtocol) HBaseClientRPC.waitForProxy( master = HBaseClientRPC.waitForProxy(rpcClientEngine, RegionServerStatusProtocol.class,
RegionServerStatusProtocol.class, isa, this.conf, -1, this.rpcTimeout, this.rpcTimeout);
isa, this.conf, -1,
this.rpcTimeout, this.rpcTimeout);
LOG.info("Connected to master at " + isa); LOG.info("Connected to master at " + isa);
} catch (IOException e) { } catch (IOException e) {
e = e instanceof RemoteException ? e = e instanceof RemoteException ?

View File

@ -148,7 +148,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
this.zkHelper.getZookeeperWatcher().close(); this.zkHelper.getZookeeperWatcher().close();
} }
// Not sure why we're deleting a connection that we never acquired or used // Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.getConf(), true); HConnectionManager.deleteConnection(this.getConf());
} }
@Override @Override

View File

@ -529,13 +529,6 @@
</description> </description>
</property> </property>
<property>
<name>hbase.rpc.client.engine</name>
<value>org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine</value>
<description>Implementation of org.apache.hadoop.hbase.ipc.RpcClientEngine to be
used for client RPC call marshalling.
</description>
</property>
<property> <property>
<name>hbase.rpc.server.engine</name> <name>hbase.rpc.server.engine</name>
<value>org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine</value> <value>org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine</value>

View File

@ -318,7 +318,7 @@
} }
} // end else } // end else
HConnectionManager.deleteConnection(hbadmin.getConfiguration(), false); HConnectionManager.deleteConnection(hbadmin.getConfiguration());
%> %>

View File

@ -500,7 +500,7 @@ public class MiniHBaseCluster extends HBaseCluster {
if (this.hbaseCluster != null) { if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown(); this.hbaseCluster.shutdown();
} }
HConnectionManager.deleteAllConnections(false); HConnectionManager.deleteAllConnections();
} }
@Override @Override

View File

@ -177,7 +177,7 @@ public class TestCatalogTracker {
// Join the thread... should exit shortly. // Join the thread... should exit shortly.
t.join(); t.join();
} finally { } finally {
HConnectionManager.deleteConnection(UTIL.getConfiguration(), true); HConnectionManager.deleteConnection(UTIL.getConfiguration());
} }
} }
@ -255,7 +255,7 @@ public class TestCatalogTracker {
} }
} finally { } finally {
// Clear out our doctored connection or could mess up subsequent tests. // Clear out our doctored connection or could mess up subsequent tests.
HConnectionManager.deleteConnection(UTIL.getConfiguration(), true); HConnectionManager.deleteConnection(UTIL.getConfiguration());
} }
} }
@ -282,7 +282,7 @@ public class TestCatalogTracker {
} }
} finally { } finally {
// Clear out our doctored connection or could mess up subsequent tests. // Clear out our doctored connection or could mess up subsequent tests.
HConnectionManager.deleteConnection(UTIL.getConfiguration(), true); HConnectionManager.deleteConnection(UTIL.getConfiguration());
} }
} }
@ -368,7 +368,7 @@ public class TestCatalogTracker {
final CatalogTracker ct = constructAndStartCatalogTracker(connection); final CatalogTracker ct = constructAndStartCatalogTracker(connection);
ct.waitForMeta(100); ct.waitForMeta(100);
} finally { } finally {
HConnectionManager.deleteConnection(UTIL.getConfiguration(), true); HConnectionManager.deleteConnection(UTIL.getConfiguration());
} }
} }
@ -460,7 +460,7 @@ public class TestCatalogTracker {
// Now meta is available. // Now meta is available.
Assert.assertTrue(ct.waitForMeta(10000).equals(SN)); Assert.assertTrue(ct.waitForMeta(10000).equals(SN));
} finally { } finally {
HConnectionManager.deleteConnection(UTIL.getConfiguration(), true); HConnectionManager.deleteConnection(UTIL.getConfiguration());
} }
} }
@ -475,7 +475,7 @@ public class TestCatalogTracker {
* {@link HConnection#getAdmin(String, int)} is called, returns the passed * {@link HConnection#getAdmin(String, int)} is called, returns the passed
* {@link ClientProtocol} instance when {@link HConnection#getClient(String, int)} * {@link ClientProtocol} instance when {@link HConnection#getClient(String, int)}
* is called (Be sure call * is called (Be sure call
* {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration, boolean)} * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration)}
* when done with this mocked Connection. * when done with this mocked Connection.
* @throws IOException * @throws IOException
*/ */

View File

@ -208,7 +208,7 @@ public class TestMetaReaderEditorNoCluster {
scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any()); scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
} finally { } finally {
if (ct != null) ct.stop(); if (ct != null) ct.stop();
HConnectionManager.deleteConnection(UTIL.getConfiguration(), true); HConnectionManager.deleteConnection(UTIL.getConfiguration());
zkw.close(); zkw.close();
} }
} }

View File

@ -49,8 +49,6 @@ public class TestClientTimeouts {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
RandomTimeoutRpcEngine.setProtocolEngine(conf, MasterAdminProtocol.class);
RandomTimeoutRpcEngine.setProtocolEngine(conf, MasterMonitorProtocol.class);
TEST_UTIL.startMiniCluster(SLAVES); TEST_UTIL.startMiniCluster(SLAVES);
} }
@ -73,26 +71,35 @@ public class TestClientTimeouts {
HConnection lastConnection = null; HConnection lastConnection = null;
boolean lastFailed = false; boolean lastFailed = false;
int initialInvocations = RandomTimeoutRpcEngine.getNumberOfInvocations(); int initialInvocations = RandomTimeoutRpcEngine.getNumberOfInvocations();
for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
lastFailed = false; RandomTimeoutRpcEngine engine = new RandomTimeoutRpcEngine(TEST_UTIL.getConfiguration());
// Ensure the HBaseAdmin uses a new connection by changing Configuration. try {
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++lastLimit); lastFailed = false;
try { // Ensure the HBaseAdmin uses a new connection by changing Configuration.
HBaseAdmin admin = new HBaseAdmin(conf); Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
HConnection connection = admin.getConnection(); conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++lastLimit);
assertFalse(connection == lastConnection); try {
// run some admin commands HBaseAdmin admin = new HBaseAdmin(conf);
HBaseAdmin.checkHBaseAvailable(conf); HConnection connection = admin.getConnection();
admin.setBalancerRunning(false, false); assertFalse(connection == lastConnection);
} catch (MasterNotRunningException ex) { lastConnection = connection;
// Since we are randomly throwing SocketTimeoutExceptions, it is possible to get // override the connection's rpc engine for timeout testing
// a MasterNotRunningException. It's a bug if we get other exceptions. ((HConnectionManager.HConnectionImplementation)connection).setRpcEngine(engine);
lastFailed = true; // run some admin commands
HBaseAdmin.checkHBaseAvailable(conf);
admin.setBalancerRunning(false, false);
} catch (MasterNotRunningException ex) {
// Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
// a MasterNotRunningException. It's a bug if we get other exceptions.
lastFailed = true;
}
} }
// Ensure the RandomTimeoutRpcEngine is actually being used.
assertFalse(lastFailed);
assertTrue(RandomTimeoutRpcEngine.getNumberOfInvocations() > initialInvocations);
} finally {
engine.close();
} }
// Ensure the RandomTimeoutRpcEngine is actually being used.
assertFalse(lastFailed);
assertTrue(RandomTimeoutRpcEngine.getNumberOfInvocations() > initialInvocations);
} }
} }

View File

@ -285,7 +285,7 @@ public class TestFromClientSide {
z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false); z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
HConnectionManager.deleteConnection(newConfig, true); HConnectionManager.deleteConnection(newConfig);
try { try {
z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false); z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false);
assertTrue("We should not have a valid connection for z2", false); assertTrue("We should not have a valid connection for z2", false);
@ -296,7 +296,7 @@ public class TestFromClientSide {
// We expect success here. // We expect success here.
HConnectionManager.deleteConnection(newConfig2, true); HConnectionManager.deleteConnection(newConfig2);
try { try {
z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false); z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
assertTrue("We should not have a valid connection for z4", false); assertTrue("We should not have a valid connection for z4", false);

View File

@ -405,7 +405,7 @@ public class TestHCM {
} finally { } finally {
for (HConnection c: connections) { for (HConnection c: connections) {
// Clean up connections made so we don't interfere w/ subsequent tests. // Clean up connections made so we don't interfere w/ subsequent tests.
HConnectionManager.deleteConnection(c.getConfiguration(), true); HConnectionManager.deleteConnection(c.getConfiguration());
} }
} }
} }

View File

@ -46,29 +46,24 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine {
private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Random RANDOM = new Random(System.currentTimeMillis());
public static double chanceOfTimeout = 0.3; public static double chanceOfTimeout = 0.3;
private static AtomicInteger invokations = new AtomicInteger(); private static AtomicInteger invokations = new AtomicInteger();
public IpcProtocol getProxy( public RandomTimeoutRpcEngine(Configuration conf) {
Class<? extends IpcProtocol> protocol, super(conf);
InetSocketAddress addr, User ticket, }
Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
@Override
public <T extends IpcProtocol> T getProxy(
Class<T> protocol, InetSocketAddress addr, Configuration conf, int rpcTimeout)
throws IOException {
// Start up the requested-for proxy so we can pass-through calls to the underlying // Start up the requested-for proxy so we can pass-through calls to the underlying
// RpcEngine. Also instantiate and return our own proxy (RandomTimeoutInvocationHandler) // RpcEngine. Also instantiate and return our own proxy (RandomTimeoutInvocationHandler)
// that will either throw exceptions or pass through to the underlying proxy. // that will either throw exceptions or pass through to the underlying proxy.
IpcProtocol actualProxy = super.getProxy(protocol, addr, T actualProxy = super.getProxy(protocol, addr, conf, rpcTimeout);
ticket, conf, factory, rpcTimeout);
RandomTimeoutInvocationHandler invoker = RandomTimeoutInvocationHandler invoker =
new RandomTimeoutInvocationHandler(actualProxy); new RandomTimeoutInvocationHandler(actualProxy);
IpcProtocol object = (IpcProtocol)Proxy.newProxyInstance( T wrapperProxy = (T)Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker); protocol.getClassLoader(), new Class[]{protocol}, invoker);
return object; return wrapperProxy;
}
/**
* Call this in order to set this class to run as the RpcEngine for the given protocol
*/
public static void setProtocolEngine(Configuration conf,
Class<? extends IpcProtocol> protocol) {
HBaseClientRPC.setProtocolEngine(conf, protocol, RandomTimeoutRpcEngine.class);
} }
/** /**

View File

@ -73,28 +73,33 @@ public class TestDelayedRpc {
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start(); rpcServer.start();
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
rpcServer.getListenerAddress(), conf, 1000); try {
TestRpc client = clientEngine.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000);
List<Integer> results = new ArrayList<Integer>(); List<Integer> results = new ArrayList<Integer>();
TestThread th1 = new TestThread(client, true, results); TestThread th1 = new TestThread(client, true, results);
TestThread th2 = new TestThread(client, false, results); TestThread th2 = new TestThread(client, false, results);
TestThread th3 = new TestThread(client, false, results); TestThread th3 = new TestThread(client, false, results);
th1.start(); th1.start();
Thread.sleep(100); Thread.sleep(100);
th2.start(); th2.start();
Thread.sleep(200); Thread.sleep(200);
th3.start(); th3.start();
th1.join(); th1.join();
th2.join(); th2.join();
th3.join(); th3.join();
assertEquals(UNDELAYED, results.get(0).intValue()); assertEquals(UNDELAYED, results.get(0).intValue());
assertEquals(UNDELAYED, results.get(1).intValue()); assertEquals(UNDELAYED, results.get(1).intValue());
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
0xDEADBEEF); 0xDEADBEEF);
} finally {
clientEngine.close();
}
} }
private static class ListAppender extends AppenderSkeleton { private static class ListAppender extends AppenderSkeleton {
@ -136,32 +141,38 @@ public class TestDelayedRpc {
new Class<?>[]{ TestRpcImpl.class }, new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start(); rpcServer.start();
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000);
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1]; ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
try {
TestRpc client = clientEngine.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000);
for (int i = 0; i < MAX_DELAYED_RPC; i++) { Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
threads[i] = new TestThread(client, true, null);
threads[i].start(); for (int i = 0; i < MAX_DELAYED_RPC; i++) {
threads[i] = new TestThread(client, true, null);
threads[i].start();
}
/* No warnings till here. */
assertTrue(listAppender.getMessages().isEmpty());
/* This should give a warning. */
threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
threads[MAX_DELAYED_RPC].start();
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
threads[i].join();
}
assertFalse(listAppender.getMessages().isEmpty());
assertTrue(listAppender.getMessages().get(0).startsWith(
"Too many delayed calls"));
log.removeAppender(listAppender);
} finally {
clientEngine.close();
} }
/* No warnings till here. */
assertTrue(listAppender.getMessages().isEmpty());
/* This should give a warning. */
threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
threads[MAX_DELAYED_RPC].start();
for (int i = 0; i < MAX_DELAYED_RPC; i++) {
threads[i].join();
}
assertFalse(listAppender.getMessages().isEmpty());
assertTrue(listAppender.getMessages().get(0).startsWith(
"Too many delayed calls"));
log.removeAppender(listAppender);
} }
public interface TestRpc extends IpcProtocol { public interface TestRpc extends IpcProtocol {
@ -178,7 +189,6 @@ public class TestDelayedRpc {
/** /**
* @param delayReturnValue Should the response to the delayed call be set * @param delayReturnValue Should the response to the delayed call be set
* at the start or the end of the delay. * at the start or the end of the delay.
* @param delay Amount of milliseconds to delay the call by
*/ */
public TestRpcImpl(boolean delayReturnValue) { public TestRpcImpl(boolean delayReturnValue) {
this.delayReturnValue = delayReturnValue; this.delayReturnValue = delayReturnValue;
@ -251,29 +261,34 @@ public class TestDelayedRpc {
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start(); rpcServer.start();
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
rpcServer.getListenerAddress(), conf, 1000);
int result = 0xDEADBEEF;
try { try {
result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse(); TestRpc client = clientEngine.getProxy(TestRpc.class,
} catch (Exception e) { rpcServer.getListenerAddress(), conf, 1000);
fail("No exception should have been thrown.");
}
assertEquals(result, UNDELAYED);
boolean caughtException = false; int result = 0xDEADBEEF;
try {
result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse(); try {
} catch(Exception e) { result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
// Exception thrown by server is enclosed in a RemoteException. } catch (Exception e) {
if (e.getCause().getMessage().contains( fail("No exception should have been thrown.");
"java.lang.Exception: Something went wrong")) }
caughtException = true; assertEquals(result, UNDELAYED);
Log.warn(e);
boolean caughtException = false;
try {
result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
} catch(Exception e) {
// Exception thrown by server is enclosed in a RemoteException.
if (e.getCause().getMessage().contains(
"java.lang.Exception: Something went wrong"))
caughtException = true;
Log.warn(e);
}
assertTrue(caughtException);
} finally {
clientEngine.close();
} }
assertTrue(caughtException);
} }
/** /**

View File

@ -82,9 +82,6 @@ public class TestProtoBufRpc {
@Before @Before
public void setUp() throws IOException { // Setup server for both protocols public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration(); conf = new Configuration();
// Set RPC engine to protobuf RPC engine
HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
// Create server side implementation // Create server side implementation
PBServerImpl serverImpl = new PBServerImpl(); PBServerImpl serverImpl = new PBServerImpl();
@ -102,38 +99,29 @@ public class TestProtoBufRpc {
server.stop(); server.stop();
} }
private static TestRpcService getClient() throws IOException {
// Set RPC engine to protobuf RPC engine
HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class,
addr, conf, 10000);
}
@Test @Test
public void testProtoBufRpc() throws Exception { public void testProtoBufRpc() throws Exception {
TestRpcService client = getClient(); ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
testProtoBufRpc(client);
}
// separated test out so that other tests can call it.
public static void testProtoBufRpc(TestRpcService client) throws Exception {
// Test ping method
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
client.ping(null, emptyRequest);
// Test echo method
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
.setMessage("hello").build();
EchoResponseProto echoResponse = client.echo(null, echoRequest);
Assert.assertEquals(echoResponse.getMessage(), "hello");
// Test error method - error should be thrown as RemoteException
try { try {
client.error(null, emptyRequest); TestRpcService client = clientEngine.getProxy(TestRpcService.class, addr, conf, 10000);
Assert.fail("Expected exception is not thrown"); // Test ping method
} catch (ServiceException e) { EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
client.ping(null, emptyRequest);
// Test echo method
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
.setMessage("hello").build();
EchoResponseProto echoResponse = client.echo(null, echoRequest);
Assert.assertEquals(echoResponse.getMessage(), "hello");
// Test error method - error should be thrown as RemoteException
try {
client.error(null, emptyRequest);
Assert.fail("Expected exception is not thrown");
} catch (ServiceException e) {
}
} finally {
clientEngine.close();
} }
} }
} }

View File

@ -160,7 +160,7 @@ public class TestCatalogJanitor {
this.ct.stop(); this.ct.stop();
} }
if (this.connection != null) { if (this.connection != null) {
HConnectionManager.deleteConnection(this.connection.getConfiguration(), true); HConnectionManager.deleteConnection(this.connection.getConfiguration());
} }
} }
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.MasterMonitorProtocol; import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.junit.Test; import org.junit.Test;
@ -50,31 +51,36 @@ public class TestHMasterRPCException {
ServerName sm = hm.getServerName(); ServerName sm = hm.getServerName();
InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort()); InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort());
int i = 0; ProtobufRpcClientEngine engine = new ProtobufRpcClientEngine(conf);
//retry the RPC a few times; we have seen SocketTimeoutExceptions if we try {
//try to connect too soon. Retry on SocketTimeoutException. int i = 0;
while (i < 20) { //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
try { //try to connect too soon. Retry on SocketTimeoutException.
MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseClientRPC.getProxy( while (i < 20) {
MasterMonitorProtocol.class, isa, conf, 100 * 10); try {
inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance()); MasterMonitorProtocol inf = engine.getProxy(
fail(); MasterMonitorProtocol.class, isa, conf, 100 * 10);
} catch (ServiceException ex) { inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
IOException ie = ProtobufUtil.getRemoteException(ex); fail();
if (!(ie instanceof SocketTimeoutException)) { } catch (ServiceException ex) {
if(ie.getMessage().startsWith( IOException ie = ProtobufUtil.getRemoteException(ex);
"org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")) { if (!(ie instanceof SocketTimeoutException)) {
return; if(ie.getMessage().startsWith(
"org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")) {
return;
}
} else {
System.err.println("Got SocketTimeoutException. Will retry. ");
} }
} else { } catch (Throwable t) {
System.err.println("Got SocketTimeoutException. Will retry. "); fail("Unexpected throwable: " + t);
} }
} catch (Throwable t) { Thread.sleep(100);
fail("Unexpected throwable: " + t); i++;
} }
Thread.sleep(100); fail();
i++; } finally {
engine.close();
} }
fail();
} }
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC; import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController;
@ -363,20 +364,25 @@ public class TestTokenAuthentication {
public Object run() throws Exception { public Object run() throws Exception {
Configuration c = server.getConfiguration(); Configuration c = server.getConfiguration();
c.set(HConstants.CLUSTER_ID, clusterId.toString()); c.set(HConstants.CLUSTER_ID, clusterId.toString());
AuthenticationProtos.AuthenticationService.BlockingInterface proxy = ProtobufRpcClientEngine rpcClient =
(AuthenticationProtos.AuthenticationService.BlockingInterface) new ProtobufRpcClientEngine(c);
HBaseClientRPC.waitForProxy(BlockingAuthenticationService.class, try {
server.getAddress(), c, AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS, HBaseClientRPC.waitForProxy(rpcClient, BlockingAuthenticationService.class,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT, server.getAddress(), c,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT); HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
AuthenticationProtos.WhoAmIResponse response = AuthenticationProtos.WhoAmIResponse response =
proxy.whoami(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); proxy.whoami(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
String myname = response.getUsername(); String myname = response.getUsername();
assertEquals("testuser", myname); assertEquals("testuser", myname);
String authMethod = response.getAuthMethod(); String authMethod = response.getAuthMethod();
assertEquals("TOKEN", authMethod); assertEquals("TOKEN", authMethod);
} finally {
rpcClient.close();
}
return null; return null;
} }
}); });

View File

@ -108,7 +108,7 @@ public class OfflineMetaRebuildTestCore {
@After @After
public void tearDownAfter() throws Exception { public void tearDownAfter() throws Exception {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
HConnectionManager.deleteConnection(conf, true); HConnectionManager.deleteConnection(conf);
} }
/** /**

View File

@ -57,7 +57,7 @@ public class TestOfflineMetaRebuildBase extends OfflineMetaRebuildTestCore {
// shutdown the minicluster // shutdown the minicluster
TEST_UTIL.shutdownMiniHBaseCluster(); TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniZKCluster();
HConnectionManager.deleteConnection(conf, false); HConnectionManager.deleteConnection(conf);
// rebuild meta table from scratch // rebuild meta table from scratch
HBaseFsck fsck = new HBaseFsck(conf); HBaseFsck fsck = new HBaseFsck(conf);