diff --git a/CHANGES.txt b/CHANGES.txt index 21c60235fb3..70366e5a2e9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -77,6 +77,7 @@ Release 0.91.0 - Unreleased Export (Subbu M Iyer via Stack) HBASE-3440 Clean out load_table.rb and make sure all roads lead to completebulkload tool (Vidhyashankar Venkataraman via Stack) + HBASE-3653 Parallelize Server Requests on HBase Client TASK HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 3d2c5ea8b89..644de1fa6bd 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -240,6 +240,7 @@ public class HConnectionManager { private final Map servers = new ConcurrentHashMap(); + private final ConcurrentHashMap connectionLock = new ConcurrentHashMap(); /** * Map of table to table {@link HRegionLocation}s. The table key is made @@ -941,21 +942,30 @@ public class HConnectionManager { getMaster(); } HRegionInterface server; - synchronized (this.servers) { - // See if we already have a connection - server = this.servers.get(regionServer.toString()); - if (server == null) { // Get a connection - try { - server = (HRegionInterface)HBaseRPC.waitForProxy( - serverInterfaceClass, HRegionInterface.VERSION, - regionServer.getInetSocketAddress(), this.conf, - this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); - } catch (RemoteException e) { - LOG.warn("RemoteException connecting to RS", e); - // Throw what the RemoteException was carrying. - throw RemoteExceptionHandler.decodeRemoteException(e); + String rsName = regionServer.toString(); + // See if we already have a connection (common case) + server = this.servers.get(rsName); + if (server == null) { + // create a unique lock for this RS (if necessary) + this.connectionLock.putIfAbsent(rsName, rsName); + // get the RS lock + synchronized (this.connectionLock.get(rsName)) { + // do one more lookup in case we were stalled above + server = this.servers.get(rsName); + if (server == null) { + try { + // definitely a cache miss. establish an RPC for this RS + server = (HRegionInterface) HBaseRPC.waitForProxy( + serverInterfaceClass, HRegionInterface.VERSION, + regionServer.getInetSocketAddress(), this.conf, + this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); + this.servers.put(rsName, server); + } catch (RemoteException e) { + LOG.warn("RemoteException connecting to RS", e); + // Throw what the RemoteException was carrying. + throw RemoteExceptionHandler.decodeRemoteException(e); + } } - this.servers.put(regionServer.toString(), server); } } return server;