HBASE-3154 HBase RPC should support timeout (Hairong via jgray)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1029776 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Gray 2010-11-01 18:01:57 +00:00
parent 2fbceee800
commit a74fa20066
6 changed files with 60 additions and 31 deletions

View File

@ -1079,6 +1079,7 @@ Release 0.21.0 - Unreleased
cacheBlocks=true
HBASE-3126 Force use of 'mv -f' when moving aside hbase logfiles
HBASE-3176 Remove compile warnings in HRegionServer
HBASE-3154 HBase RPC should support timeout (Hairong via jgray)
NEW FEATURES

View File

@ -366,6 +366,16 @@ public final class HConstants {
*/
public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
/**
* timeout for each RPC
*/
public static String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/**
* Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
*/
public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
public static final String
REPLICATION_ENABLE_KEY = "hbase.replication";

View File

@ -224,7 +224,7 @@ public class HConnectionManager {
private final long pause;
private final int numRetries;
private final int maxRPCAttempts;
private final long rpcTimeout;
private final int rpcTimeout;
private final int prefetchRegionLimit;
private final Object masterLock = new Object();
@ -282,9 +282,9 @@ public class HConnectionManager {
this.pause = conf.getLong("hbase.client.pause", 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
this.rpcTimeout = conf.getLong(
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
10);
@ -341,7 +341,7 @@ public class HConnectionManager {
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
masterLocation.getInetSocketAddress(), this.conf);
masterLocation.getInetSocketAddress(), this.conf, this.rpcTimeout);
if (tryMaster.isMasterRunning()) {
this.master = tryMaster;
@ -936,7 +936,7 @@ public class HConnectionManager {
server = (HRegionInterface)HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
this.maxRPCAttempts, this.rpcTimeout);
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
} catch (RemoteException e) {
LOG.warn("Remove exception connecting to RS", e);
throw RemoteExceptionHandler.decodeRemoteException(e);

View File

@ -79,7 +79,7 @@ public class HBaseClient {
final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
protected final int pingInterval; // how often sends ping to the server in msecs
protected int pingInterval; // how often sends ping to the server in msecs
protected final SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
@ -194,7 +194,7 @@ public class HBaseClient {
private IOException closeException; // close reason
public Connection(InetSocketAddress address) throws IOException {
this(new ConnectionId(address, null));
this(new ConnectionId(address, null, 0));
}
public Connection(ConnectionId remoteId) throws IOException {
@ -245,7 +245,8 @@ public class HBaseClient {
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get()) {
if (shouldCloseConnection.get() || !running.get() ||
remoteId.rpcTimeout > 0) {
throw e;
}
sendPing();
@ -308,6 +309,9 @@ public class HBaseClient {
this.socket.setKeepAlive(tcpKeepAlive);
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
if (remoteId.rpcTimeout > 0) {
pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
}
this.socket.setSoTimeout(pingInterval);
break;
} catch (SocketTimeoutException toe) {
@ -718,14 +722,14 @@ public class HBaseClient {
*/
public Writable call(Writable param, InetSocketAddress address)
throws IOException {
return call(param, address, null);
return call(param, address, null, 0);
}
public Writable call(Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
UserGroupInformation ticket, int rpcTimeout)
throws IOException {
Call call = new Call(param);
Connection connection = getConnection(addr, ticket, call);
Connection connection = getConnection(addr, ticket, rpcTimeout, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
@ -808,7 +812,7 @@ public class HBaseClient {
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
Connection connection = getConnection(addresses[i], null, call);
Connection connection = getConnection(addresses[i], null, 0, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
@ -831,6 +835,7 @@ public class HBaseClient {
* pool. Connections to a given host/port are reused. */
private Connection getConnection(InetSocketAddress addr,
UserGroupInformation ticket,
int rpcTimeout,
Call call)
throws IOException {
if (!running.get()) {
@ -842,7 +847,7 @@ public class HBaseClient {
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, ticket);
ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
do {
synchronized (connections) {
connection = connections.get(remoteId);
@ -868,10 +873,13 @@ public class HBaseClient {
private static class ConnectionId {
final InetSocketAddress address;
final UserGroupInformation ticket;
final private int rpcTimeout;
ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
int rpcTimeout) {
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
}
InetSocketAddress getAddress() {
@ -885,7 +893,8 @@ public class HBaseClient {
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) && ticket == id.ticket;
return address.equals(id.address) && ticket == id.ticket &&
rpcTimeout == id.rpcTimeout;
//Note : ticket is a ref comparision.
}
return false;
@ -893,7 +902,7 @@ public class HBaseClient {
@Override
public int hashCode() {
return address.hashCode() ^ System.identityHashCode(ticket);
return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
}
}
}

View File

@ -231,6 +231,7 @@ public class HBaseRPC {
private UserGroupInformation ticket;
private HBaseClient client;
private boolean isClosed = false;
final private int rpcTimeout;
/**
* @param address address for invoker
@ -239,10 +240,11 @@ public class HBaseRPC {
* @param factory socket factory
*/
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) {
Configuration conf, SocketFactory factory, int rpcTimeout) {
this.address = address;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
this.rpcTimeout = rpcTimeout;
}
public Object invoke(Object proxy, Method method, Object[] args)
@ -253,7 +255,7 @@ public class HBaseRPC {
startTime = System.currentTimeMillis();
}
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, ticket);
client.call(new Invocation(method, args), address, ticket, rpcTimeout);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@ -324,6 +326,7 @@ public class HBaseRPC {
* @param addr address of remote service
* @param conf configuration
* @param maxAttempts max attempts
* @param rpcTimeout timeout for each RPC
* @param timeout timeout in milliseconds
* @return proxy
* @throws IOException e
@ -334,6 +337,7 @@ public class HBaseRPC {
InetSocketAddress addr,
Configuration conf,
int maxAttempts,
int rpcTimeout,
long timeout
) throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
@ -342,7 +346,7 @@ public class HBaseRPC {
int reconnectAttempts = 0;
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf);
return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
ioe = se;
if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
@ -379,13 +383,15 @@ public class HBaseRPC {
* @param addr remote address
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
return getProxy(protocol, clientVersion, addr, null, conf, factory);
SocketFactory factory, int rpcTimeout) throws IOException {
return getProxy(protocol, clientVersion, addr, null, conf, factory,
rpcTimeout);
}
/**
@ -398,17 +404,18 @@ public class HBaseRPC {
* @param ticket ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory)
Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
new Invoker(addr, ticket, conf, factory, rpcTimeout));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
@ -425,15 +432,17 @@ public class HBaseRPC {
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
* @param rpcTimeout timeout for each RPC
* @return a proxy instance
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf)
long clientVersion, InetSocketAddress addr, Configuration conf,
int rpcTimeout)
throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf));
.getDefaultSocketFactory(conf), rpcTimeout);
}
/**

View File

@ -240,7 +240,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// A sleeper that sleeps for msgInterval.
private final Sleeper sleeper;
private final long rpcTimeout;
private final int rpcTimeout;
// The main region server thread.
@SuppressWarnings("unused")
@ -292,9 +292,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.numRegionsToReport = conf.getInt(
"hbase.regionserver.numregionstoreport", 10);
this.rpcTimeout = conf.getLong(
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.abortRequested = false;
this.stopped = false;
@ -1363,7 +1363,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
masterAddress.getInetSocketAddress(), this.conf, -1,
this.rpcTimeout);
this.rpcTimeout, this.rpcTimeout);
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();