HBASE-9987 Remove some synchronisation points in HConnectionManager
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1543051 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ce8aeae6db
commit
fb7fb4fa41
|
@ -25,7 +25,6 @@ import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -36,6 +35,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
import java.util.concurrent.CopyOnWriteArraySet;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
@ -606,16 +606,16 @@ public class HConnectionManager {
|
||||||
/**
|
/**
|
||||||
* Map of table to table {@link HRegionLocation}s.
|
* Map of table to table {@link HRegionLocation}s.
|
||||||
*/
|
*/
|
||||||
private final Map<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>
|
private final ConcurrentMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>
|
||||||
cachedRegionLocations =
|
cachedRegionLocations =
|
||||||
new HashMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>();
|
new ConcurrentHashMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>();
|
||||||
|
|
||||||
// The presence of a server in the map implies it's likely that there is an
|
// The presence of a server in the map implies it's likely that there is an
|
||||||
// entry in cachedRegionLocations that map to this server; but the absence
|
// entry in cachedRegionLocations that map to this server; but the absence
|
||||||
// of a server in this map guarentees that there is no entry in cache that
|
// of a server in this map guarentees that there is no entry in cache that
|
||||||
// maps to the absent server.
|
// maps to the absent server.
|
||||||
// The access to this attribute must be protected by a lock on cachedRegionLocations
|
// The access to this attribute must be protected by a lock on cachedRegionLocations
|
||||||
private final Set<ServerName> cachedServers = new HashSet<ServerName>();
|
private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
|
||||||
|
|
||||||
// region cache prefetch is enabled by default. this set contains all
|
// region cache prefetch is enabled by default. this set contains all
|
||||||
// tables whose region cache prefetch are disabled.
|
// tables whose region cache prefetch are disabled.
|
||||||
|
@ -1352,7 +1352,6 @@ public class HConnectionManager {
|
||||||
*/
|
*/
|
||||||
void forceDeleteCachedLocation(final TableName tableName, final byte [] row) {
|
void forceDeleteCachedLocation(final TableName tableName, final byte [] row) {
|
||||||
HRegionLocation rl = null;
|
HRegionLocation rl = null;
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
||||||
// start to examine the cache. we can only do cache actions
|
// start to examine the cache. we can only do cache actions
|
||||||
// if there's something in the cache for this table.
|
// if there's something in the cache for this table.
|
||||||
|
@ -1360,7 +1359,6 @@ public class HConnectionManager {
|
||||||
if (rl != null) {
|
if (rl != null) {
|
||||||
tableLocations.remove(rl.getRegionInfo().getStartKey());
|
tableLocations.remove(rl.getRegionInfo().getStartKey());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if ((rl != null) && LOG.isDebugEnabled()) {
|
if ((rl != null) && LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
|
LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
|
||||||
+ " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
|
+ " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
|
||||||
|
@ -1373,13 +1371,20 @@ public class HConnectionManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void clearCaches(final ServerName serverName) {
|
public void clearCaches(final ServerName serverName) {
|
||||||
boolean deletedSomething = false;
|
if (!this.cachedServers.contains(serverName)) {
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
if (!cachedServers.contains(serverName)) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (Map<byte[], HRegionLocation> tableLocations :
|
|
||||||
cachedRegionLocations.values()) {
|
boolean deletedSomething = false;
|
||||||
|
synchronized (this.cachedServers) {
|
||||||
|
// We block here, because if there is an error on a server, it's likely that multiple
|
||||||
|
// threads will get the error simultaneously. If there are hundreds of thousand of
|
||||||
|
// region location to check, it's better to do this only once. A better pattern would
|
||||||
|
// be to check if the server is dead when we get the region location.
|
||||||
|
if (!this.cachedServers.contains(serverName)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations.values()) {
|
||||||
for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
|
for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
|
||||||
HRegionLocation value = e.getValue();
|
HRegionLocation value = e.getValue();
|
||||||
if (value != null
|
if (value != null
|
||||||
|
@ -1389,7 +1394,7 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cachedServers.remove(serverName);
|
this.cachedServers.remove(serverName);
|
||||||
}
|
}
|
||||||
if (deletedSomething && LOG.isDebugEnabled()) {
|
if (deletedSomething && LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removed all cached region locations that map to " + serverName);
|
LOG.debug("Removed all cached region locations that map to " + serverName);
|
||||||
|
@ -1404,12 +1409,14 @@ public class HConnectionManager {
|
||||||
final TableName tableName) {
|
final TableName tableName) {
|
||||||
// find the map of cached locations for this table
|
// find the map of cached locations for this table
|
||||||
ConcurrentSkipListMap<byte[], HRegionLocation> result;
|
ConcurrentSkipListMap<byte[], HRegionLocation> result;
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
result = this.cachedRegionLocations.get(tableName);
|
result = this.cachedRegionLocations.get(tableName);
|
||||||
// if tableLocations for this table isn't built yet, make one
|
// if tableLocations for this table isn't built yet, make one
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
result = new ConcurrentSkipListMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
|
result = new ConcurrentSkipListMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
|
||||||
this.cachedRegionLocations.put(tableName, result);
|
ConcurrentSkipListMap<byte[], HRegionLocation> old =
|
||||||
|
this.cachedRegionLocations.putIfAbsent(tableName, result);
|
||||||
|
if (old != null) {
|
||||||
|
return old;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
@ -1417,18 +1424,14 @@ public class HConnectionManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clearRegionCache() {
|
public void clearRegionCache() {
|
||||||
synchronized(this.cachedRegionLocations) {
|
|
||||||
this.cachedRegionLocations.clear();
|
this.cachedRegionLocations.clear();
|
||||||
this.cachedServers.clear();
|
this.cachedServers.clear();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clearRegionCache(final TableName tableName) {
|
public void clearRegionCache(final TableName tableName) {
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
this.cachedRegionLocations.remove(tableName);
|
this.cachedRegionLocations.remove(tableName);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clearRegionCache(final byte[] tableName) {
|
public void clearRegionCache(final byte[] tableName) {
|
||||||
|
@ -1445,14 +1448,9 @@ public class HConnectionManager {
|
||||||
final HRegionLocation location) {
|
final HRegionLocation location) {
|
||||||
boolean isFromMeta = (source == null);
|
boolean isFromMeta = (source == null);
|
||||||
byte [] startKey = location.getRegionInfo().getStartKey();
|
byte [] startKey = location.getRegionInfo().getStartKey();
|
||||||
ConcurrentMap<byte[], HRegionLocation> tableLocations =
|
ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
||||||
getTableLocations(tableName);
|
HRegionLocation oldLocation = tableLocations.putIfAbsent(startKey, location);
|
||||||
boolean isNewCacheEntry = false;
|
boolean isNewCacheEntry = (oldLocation == null);
|
||||||
boolean isStaleUpdate = false;
|
|
||||||
HRegionLocation oldLocation = null;
|
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
oldLocation = tableLocations.putIfAbsent(startKey, location);
|
|
||||||
isNewCacheEntry = (oldLocation == null);
|
|
||||||
if (isNewCacheEntry) {
|
if (isNewCacheEntry) {
|
||||||
cachedServers.add(location.getServerName());
|
cachedServers.add(location.getServerName());
|
||||||
return;
|
return;
|
||||||
|
@ -1472,15 +1470,14 @@ public class HConnectionManager {
|
||||||
// There are so many corner cases with various combinations of opens and closes that
|
// There are so many corner cases with various combinations of opens and closes that
|
||||||
// an additional counter on top of seqNum would be necessary to handle them all.
|
// an additional counter on top of seqNum would be necessary to handle them all.
|
||||||
boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
|
boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
|
||||||
isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
|
boolean isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
|
||||||
updateCache = (!isStaleUpdate);
|
updateCache = (!isStaleUpdate);
|
||||||
}
|
}
|
||||||
if (updateCache) {
|
if (updateCache) {
|
||||||
tableLocations.put(startKey, location);
|
tableLocations.replace(startKey, oldLocation, location);
|
||||||
cachedServers.add(location.getServerName());
|
cachedServers.add(location.getServerName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Map keyed by service name + regionserver to service stub implementation
|
// Map keyed by service name + regionserver to service stub implementation
|
||||||
private final ConcurrentHashMap<String, Object> stubs =
|
private final ConcurrentHashMap<String, Object> stubs =
|
||||||
|
@ -2195,11 +2192,9 @@ public class HConnectionManager {
|
||||||
* @param source The source of the error that prompts us to invalidate cache.
|
* @param source The source of the error that prompts us to invalidate cache.
|
||||||
*/
|
*/
|
||||||
void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
|
void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
|
ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
|
||||||
tableLocations.remove(hri.getStartKey(), source);
|
tableLocations.remove(hri.getStartKey(), source);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteCachedRegionLocation(final HRegionLocation location) {
|
public void deleteCachedRegionLocation(final HRegionLocation location) {
|
||||||
|
@ -2209,10 +2204,8 @@ public class HConnectionManager {
|
||||||
|
|
||||||
HRegionLocation removedLocation;
|
HRegionLocation removedLocation;
|
||||||
TableName tableName = location.getRegionInfo().getTable();
|
TableName tableName = location.getRegionInfo().getTable();
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
||||||
removedLocation = tableLocations.remove(location.getRegionInfo().getStartKey());
|
removedLocation = tableLocations.remove(location.getRegionInfo().getStartKey());
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled() && removedLocation != null) {
|
if (LOG.isDebugEnabled() && removedLocation != null) {
|
||||||
LOG.debug("Removed " +
|
LOG.debug("Removed " +
|
||||||
location.getRegionInfo().getRegionNameAsString() +
|
location.getRegionInfo().getRegionNameAsString() +
|
||||||
|
@ -2405,14 +2398,12 @@ public class HConnectionManager {
|
||||||
* from a unit test.
|
* from a unit test.
|
||||||
*/
|
*/
|
||||||
int getNumberOfCachedRegionLocations(final TableName tableName) {
|
int getNumberOfCachedRegionLocations(final TableName tableName) {
|
||||||
synchronized (this.cachedRegionLocations) {
|
|
||||||
Map<byte[], HRegionLocation> tableLocs = this.cachedRegionLocations.get(tableName);
|
Map<byte[], HRegionLocation> tableLocs = this.cachedRegionLocations.get(tableName);
|
||||||
if (tableLocs == null) {
|
if (tableLocs == null) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return tableLocs.values().size();
|
return tableLocs.values().size();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check the region cache to see whether a region is cached yet or not.
|
* Check the region cache to see whether a region is cached yet or not.
|
||||||
|
|
Loading…
Reference in New Issue