HBASE-3653 : Parallelize Server Requests on HBase Client
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1082648 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a50c7d8dfc
commit
b9e1d6c698
|
@ -77,6 +77,7 @@ Release 0.91.0 - Unreleased
|
|||
Export (Subbu M Iyer via Stack)
|
||||
HBASE-3440 Clean out load_table.rb and make sure all roads lead to
|
||||
completebulkload tool (Vidhyashankar Venkataraman via Stack)
|
||||
HBASE-3653 Parallelize Server Requests on HBase Client
|
||||
|
||||
TASK
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -240,6 +240,7 @@ public class HConnectionManager {
|
|||
|
||||
private final Map<String, HRegionInterface> servers =
|
||||
new ConcurrentHashMap<String, HRegionInterface>();
|
||||
private final ConcurrentHashMap<String, String> connectionLock = new ConcurrentHashMap<String, String>();
|
||||
|
||||
/**
|
||||
* Map of table to table {@link HRegionLocation}s. The table key is made
|
||||
|
@ -941,21 +942,30 @@ public class HConnectionManager {
|
|||
getMaster();
|
||||
}
|
||||
HRegionInterface server;
|
||||
synchronized (this.servers) {
|
||||
// See if we already have a connection
|
||||
server = this.servers.get(regionServer.toString());
|
||||
if (server == null) { // Get a connection
|
||||
try {
|
||||
server = (HRegionInterface)HBaseRPC.waitForProxy(
|
||||
serverInterfaceClass, HRegionInterface.VERSION,
|
||||
regionServer.getInetSocketAddress(), this.conf,
|
||||
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
|
||||
} catch (RemoteException e) {
|
||||
LOG.warn("RemoteException connecting to RS", e);
|
||||
// Throw what the RemoteException was carrying.
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
String rsName = regionServer.toString();
|
||||
// See if we already have a connection (common case)
|
||||
server = this.servers.get(rsName);
|
||||
if (server == null) {
|
||||
// create a unique lock for this RS (if necessary)
|
||||
this.connectionLock.putIfAbsent(rsName, rsName);
|
||||
// get the RS lock
|
||||
synchronized (this.connectionLock.get(rsName)) {
|
||||
// do one more lookup in case we were stalled above
|
||||
server = this.servers.get(rsName);
|
||||
if (server == null) {
|
||||
try {
|
||||
// definitely a cache miss. establish an RPC for this RS
|
||||
server = (HRegionInterface) HBaseRPC.waitForProxy(
|
||||
serverInterfaceClass, HRegionInterface.VERSION,
|
||||
regionServer.getInetSocketAddress(), this.conf,
|
||||
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
|
||||
this.servers.put(rsName, server);
|
||||
} catch (RemoteException e) {
|
||||
LOG.warn("RemoteException connecting to RS", e);
|
||||
// Throw what the RemoteException was carrying.
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
}
|
||||
this.servers.put(regionServer.toString(), server);
|
||||
}
|
||||
}
|
||||
return server;
|
||||
|
|
Loading…
Reference in New Issue