From a74fa20066e7ed447d854c52f5ea75c032e3e045 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 1 Nov 2010 18:01:57 +0000 Subject: [PATCH] HBASE-3154 HBase RPC should support timeout (Hairong via jgray) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1029776 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../org/apache/hadoop/hbase/HConstants.java | 10 ++++++ .../hbase/client/HConnectionManager.java | 12 +++---- .../apache/hadoop/hbase/ipc/HBaseClient.java | 31 ++++++++++++------- .../org/apache/hadoop/hbase/ipc/HBaseRPC.java | 27 ++++++++++------ .../hbase/regionserver/HRegionServer.java | 10 +++--- 6 files changed, 60 insertions(+), 31 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 87390969827..7a9fe8f321b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1079,6 +1079,7 @@ Release 0.21.0 - Unreleased cacheBlocks=true HBASE-3126 Force use of 'mv -f' when moving aside hbase logfiles HBASE-3176 Remove compile warnings in HRegionServer + HBASE-3154 HBase RPC should support timeout (Hairong via jgray) NEW FEATURES diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 71c3e7bf471..70de08a6db9 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -365,6 +365,16 @@ public final class HConstants { * Default value of {@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}. */ public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000; + + /** + * timeout for each RPC + */ + public static String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; + + /** + * Default value of {@link #HBASE_RPC_TIMEOUT_KEY} + */ + public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000; public static final String REPLICATION_ENABLE_KEY = "hbase.replication"; diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 35fe5c4e98f..1838b8d3096 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -224,7 +224,7 @@ public class HConnectionManager { private final long pause; private final int numRetries; private final int maxRPCAttempts; - private final long rpcTimeout; + private final int rpcTimeout; private final int prefetchRegionLimit; private final Object masterLock = new Object(); @@ -282,9 +282,9 @@ public class HConnectionManager { this.pause = conf.getLong("hbase.client.pause", 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 10); this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1); - this.rpcTimeout = conf.getLong( - HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + this.rpcTimeout = conf.getInt( + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit", 10); @@ -341,7 +341,7 @@ public class HConnectionManager { HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy( HMasterInterface.class, HBaseRPCProtocolVersion.versionID, - masterLocation.getInetSocketAddress(), this.conf); + masterLocation.getInetSocketAddress(), this.conf, this.rpcTimeout); if (tryMaster.isMasterRunning()) { this.master = tryMaster; @@ -936,7 +936,7 @@ public class HConnectionManager { server = (HRegionInterface)HBaseRPC.waitForProxy( serverInterfaceClass, HBaseRPCProtocolVersion.versionID, regionServer.getInetSocketAddress(), this.conf, - this.maxRPCAttempts, this.rpcTimeout); + this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); } catch (RemoteException e) { LOG.warn("Remove exception connecting to RS", e); throw RemoteExceptionHandler.decodeRemoteException(e); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index ac44b40da71..f8d8af73da0 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -79,7 +79,7 @@ public class HBaseClient { final protected long failureSleep; // Time to sleep before retry on failure. protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives - protected final int pingInterval; // how often sends ping to the server in msecs + protected int pingInterval; // how often sends ping to the server in msecs protected final SocketFactory socketFactory; // how to create sockets private int refCount = 1; @@ -194,7 +194,7 @@ public class HBaseClient { private IOException closeException; // close reason public Connection(InetSocketAddress address) throws IOException { - this(new ConnectionId(address, null)); + this(new ConnectionId(address, null, 0)); } public Connection(ConnectionId remoteId) throws IOException { @@ -245,7 +245,8 @@ public class HBaseClient { * otherwise, throw the timeout exception. */ private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get()) { + if (shouldCloseConnection.get() || !running.get() || + remoteId.rpcTimeout > 0) { throw e; } sendPing(); @@ -308,6 +309,9 @@ public class HBaseClient { this.socket.setKeepAlive(tcpKeepAlive); // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), 20000); + if (remoteId.rpcTimeout > 0) { + pingInterval = remoteId.rpcTimeout; // overwrite pingInterval + } this.socket.setSoTimeout(pingInterval); break; } catch (SocketTimeoutException toe) { @@ -718,14 +722,14 @@ public class HBaseClient { */ public Writable call(Writable param, InetSocketAddress address) throws IOException { - return call(param, address, null); + return call(param, address, null, 0); } public Writable call(Writable param, InetSocketAddress addr, - UserGroupInformation ticket) + UserGroupInformation ticket, int rpcTimeout) throws IOException { Call call = new Call(param); - Connection connection = getConnection(addr, ticket, call); + Connection connection = getConnection(addr, ticket, rpcTimeout, call); connection.sendParam(call); // send the parameter boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter @@ -808,7 +812,7 @@ public class HBaseClient { for (int i = 0; i < params.length; i++) { ParallelCall call = new ParallelCall(params[i], results, i); try { - Connection connection = getConnection(addresses[i], null, call); + Connection connection = getConnection(addresses[i], null, 0, call); connection.sendParam(call); // send each parameter } catch (IOException e) { // log errors @@ -831,6 +835,7 @@ public class HBaseClient { * pool. Connections to a given host/port are reused. */ private Connection getConnection(InetSocketAddress addr, UserGroupInformation ticket, + int rpcTimeout, Call call) throws IOException { if (!running.get()) { @@ -842,7 +847,7 @@ public class HBaseClient { * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ - ConnectionId remoteId = new ConnectionId(addr, ticket); + ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout); do { synchronized (connections) { connection = connections.get(remoteId); @@ -868,10 +873,13 @@ public class HBaseClient { private static class ConnectionId { final InetSocketAddress address; final UserGroupInformation ticket; + final private int rpcTimeout; - ConnectionId(InetSocketAddress address, UserGroupInformation ticket) { + ConnectionId(InetSocketAddress address, UserGroupInformation ticket, + int rpcTimeout) { this.address = address; this.ticket = ticket; + this.rpcTimeout = rpcTimeout; } InetSocketAddress getAddress() { @@ -885,7 +893,8 @@ public class HBaseClient { public boolean equals(Object obj) { if (obj instanceof ConnectionId) { ConnectionId id = (ConnectionId) obj; - return address.equals(id.address) && ticket == id.ticket; + return address.equals(id.address) && ticket == id.ticket && + rpcTimeout == id.rpcTimeout; //Note : ticket is a ref comparision. } return false; @@ -893,7 +902,7 @@ public class HBaseClient { @Override public int hashCode() { - return address.hashCode() ^ System.identityHashCode(ticket); + return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout; } } } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index e23a629dd9e..4f4828bdaf2 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -231,6 +231,7 @@ public class HBaseRPC { private UserGroupInformation ticket; private HBaseClient client; private boolean isClosed = false; + final private int rpcTimeout; /** * @param address address for invoker @@ -239,10 +240,11 @@ public class HBaseRPC { * @param factory socket factory */ public Invoker(InetSocketAddress address, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) { + Configuration conf, SocketFactory factory, int rpcTimeout) { this.address = address; this.ticket = ticket; this.client = CLIENTS.getClient(conf, factory); + this.rpcTimeout = rpcTimeout; } public Object invoke(Object proxy, Method method, Object[] args) @@ -253,7 +255,7 @@ public class HBaseRPC { startTime = System.currentTimeMillis(); } HbaseObjectWritable value = (HbaseObjectWritable) - client.call(new Invocation(method, args), address, ticket); + client.call(new Invocation(method, args), address, ticket, rpcTimeout); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -324,6 +326,7 @@ public class HBaseRPC { * @param addr address of remote service * @param conf configuration * @param maxAttempts max attempts + * @param rpcTimeout timeout for each RPC * @param timeout timeout in milliseconds * @return proxy * @throws IOException e @@ -334,6 +337,7 @@ public class HBaseRPC { InetSocketAddress addr, Configuration conf, int maxAttempts, + int rpcTimeout, long timeout ) throws IOException { // HBase does limited number of reconnects which is different from hadoop. @@ -342,7 +346,7 @@ public class HBaseRPC { int reconnectAttempts = 0; while (true) { try { - return getProxy(protocol, clientVersion, addr, conf); + return getProxy(protocol, clientVersion, addr, conf, rpcTimeout); } catch(ConnectException se) { // namenode has not been started ioe = se; if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) { @@ -379,13 +383,15 @@ public class HBaseRPC { * @param addr remote address * @param conf configuration * @param factory socket factory + * @param rpcTimeout timeout for each RPC * @return proxy * @throws IOException e */ public static VersionedProtocol getProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, - SocketFactory factory) throws IOException { - return getProxy(protocol, clientVersion, addr, null, conf, factory); + SocketFactory factory, int rpcTimeout) throws IOException { + return getProxy(protocol, clientVersion, addr, null, conf, factory, + rpcTimeout); } /** @@ -398,17 +404,18 @@ public class HBaseRPC { * @param ticket ticket * @param conf configuration * @param factory socket factory + * @param rpcTimeout timeout for each RPC * @return proxy * @throws IOException e */ public static VersionedProtocol getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) + Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(addr, ticket, conf, factory)); + new Invoker(addr, ticket, conf, factory, rpcTimeout)); long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion == clientVersion) { @@ -425,15 +432,17 @@ public class HBaseRPC { * @param clientVersion version we are expecting * @param addr remote address * @param conf configuration + * @param rpcTimeout timeout for each RPC * @return a proxy instance * @throws IOException e */ public static VersionedProtocol getProxy(Class protocol, - long clientVersion, InetSocketAddress addr, Configuration conf) + long clientVersion, InetSocketAddress addr, Configuration conf, + int rpcTimeout) throws IOException { return getProxy(protocol, clientVersion, addr, conf, NetUtils - .getDefaultSocketFactory(conf)); + .getDefaultSocketFactory(conf), rpcTimeout); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f637f5234c7..90ae20f42c3 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -240,7 +240,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; - private final long rpcTimeout; + private final int rpcTimeout; // The main region server thread. @SuppressWarnings("unused") @@ -292,9 +292,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.numRegionsToReport = conf.getInt( "hbase.regionserver.numregionstoreport", 10); - this.rpcTimeout = conf.getLong( - HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); + this.rpcTimeout = conf.getInt( + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.abortRequested = false; this.stopped = false; @@ -1363,7 +1363,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, master = (HMasterRegionInterface) HBaseRPC.waitForProxy( HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID, masterAddress.getInetSocketAddress(), this.conf, -1, - this.rpcTimeout); + this.rpcTimeout, this.rpcTimeout); } catch (IOException e) { LOG.warn("Unable to connect to master. Retrying. Error was:", e); sleeper.sleep();