HBASE-9869 Optimize HConnectionManager#getCachedLocation
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1542688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e0f80c333
commit
c6e6ab4e3e
|
@ -35,6 +35,7 @@ import java.util.NavigableMap;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.CopyOnWriteArraySet;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
@ -116,7 +117,6 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
@ -606,9 +606,9 @@ public class HConnectionManager {
|
||||||
/**
|
/**
|
||||||
* Map of table to table {@link HRegionLocation}s.
|
* Map of table to table {@link HRegionLocation}s.
|
||||||
*/
|
*/
|
||||||
private final Map<TableName, SoftValueSortedMap<byte[], HRegionLocation>>
|
private final Map<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>
|
||||||
cachedRegionLocations =
|
cachedRegionLocations =
|
||||||
new HashMap<TableName, SoftValueSortedMap<byte[], HRegionLocation>>();
|
new HashMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>();
|
||||||
|
|
||||||
// The presence of a server in the map implies it's likely that there is an
|
// The presence of a server in the map implies it's likely that there is an
|
||||||
// entry in cachedRegionLocations that map to this server; but the absence
|
// entry in cachedRegionLocations that map to this server; but the absence
|
||||||
|
@ -1312,10 +1312,7 @@ public class HConnectionManager {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Search the cache for a location that fits our table and row key.
|
* Search the cache for a location that fits our table and row key.
|
||||||
* Return null if no suitable region is located. TODO: synchronization note
|
* Return null if no suitable region is located.
|
||||||
*
|
|
||||||
* <p>TODO: This method during writing consumes 15% of CPU doing lookup
|
|
||||||
* into the Soft Reference SortedMap. Improve.
|
|
||||||
*
|
*
|
||||||
* @param tableName
|
* @param tableName
|
||||||
* @param row
|
* @param row
|
||||||
|
@ -1323,24 +1320,14 @@ public class HConnectionManager {
|
||||||
*/
|
*/
|
||||||
HRegionLocation getCachedLocation(final TableName tableName,
|
HRegionLocation getCachedLocation(final TableName tableName,
|
||||||
final byte [] row) {
|
final byte [] row) {
|
||||||
SoftValueSortedMap<byte[], HRegionLocation> tableLocations =
|
ConcurrentSkipListMap<byte[], HRegionLocation> tableLocations =
|
||||||
getTableLocations(tableName);
|
getTableLocations(tableName);
|
||||||
|
|
||||||
// start to examine the cache. we can only do cache actions
|
Entry<byte[], HRegionLocation> e = tableLocations.floorEntry(row);
|
||||||
// if there's something in the cache for this table.
|
if (e == null) {
|
||||||
if (tableLocations.isEmpty()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
HRegionLocation possibleRegion = tableLocations.get(row);
|
|
||||||
if (possibleRegion != null) {
|
|
||||||
return possibleRegion;
|
|
||||||
}
|
|
||||||
|
|
||||||
possibleRegion = tableLocations.lowerValueByKey(row);
|
|
||||||
if (possibleRegion == null) {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
HRegionLocation possibleRegion = e.getValue();
|
||||||
|
|
||||||
// make sure that the end key is greater than the row we're looking
|
// make sure that the end key is greater than the row we're looking
|
||||||
// for, otherwise the row actually belongs in the next region, not
|
// for, otherwise the row actually belongs in the next region, not
|
||||||
|
@ -1369,11 +1356,9 @@ public class HConnectionManager {
|
||||||
Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
||||||
// start to examine the cache. we can only do cache actions
|
// start to examine the cache. we can only do cache actions
|
||||||
// if there's something in the cache for this table.
|
// if there's something in the cache for this table.
|
||||||
if (!tableLocations.isEmpty()) {
|
rl = getCachedLocation(tableName, row);
|
||||||
rl = getCachedLocation(tableName, row);
|
if (rl != null) {
|
||||||
if (rl != null) {
|
tableLocations.remove(rl.getRegionInfo().getStartKey());
|
||||||
tableLocations.remove(rl.getRegionInfo().getStartKey());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ((rl != null) && LOG.isDebugEnabled()) {
|
if ((rl != null) && LOG.isDebugEnabled()) {
|
||||||
|
@ -1415,15 +1400,15 @@ public class HConnectionManager {
|
||||||
* @param tableName
|
* @param tableName
|
||||||
* @return Map of cached locations for passed <code>tableName</code>
|
* @return Map of cached locations for passed <code>tableName</code>
|
||||||
*/
|
*/
|
||||||
private SoftValueSortedMap<byte[], HRegionLocation> getTableLocations(
|
private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
|
||||||
final TableName tableName) {
|
final TableName tableName) {
|
||||||
// find the map of cached locations for this table
|
// find the map of cached locations for this table
|
||||||
SoftValueSortedMap<byte[], HRegionLocation> result;
|
ConcurrentSkipListMap<byte[], HRegionLocation> result;
|
||||||
synchronized (this.cachedRegionLocations) {
|
synchronized (this.cachedRegionLocations) {
|
||||||
result = this.cachedRegionLocations.get(tableName);
|
result = this.cachedRegionLocations.get(tableName);
|
||||||
// if tableLocations for this table isn't built yet, make one
|
// if tableLocations for this table isn't built yet, make one
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
result = new SoftValueSortedMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
|
result = new ConcurrentSkipListMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
|
||||||
this.cachedRegionLocations.put(tableName, result);
|
this.cachedRegionLocations.put(tableName, result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1460,17 +1445,23 @@ public class HConnectionManager {
|
||||||
final HRegionLocation location) {
|
final HRegionLocation location) {
|
||||||
boolean isFromMeta = (source == null);
|
boolean isFromMeta = (source == null);
|
||||||
byte [] startKey = location.getRegionInfo().getStartKey();
|
byte [] startKey = location.getRegionInfo().getStartKey();
|
||||||
Map<byte[], HRegionLocation> tableLocations =
|
ConcurrentMap<byte[], HRegionLocation> tableLocations =
|
||||||
getTableLocations(tableName);
|
getTableLocations(tableName);
|
||||||
boolean isNewCacheEntry = false;
|
boolean isNewCacheEntry = false;
|
||||||
boolean isStaleUpdate = false;
|
boolean isStaleUpdate = false;
|
||||||
HRegionLocation oldLocation = null;
|
HRegionLocation oldLocation = null;
|
||||||
synchronized (this.cachedRegionLocations) {
|
synchronized (this.cachedRegionLocations) {
|
||||||
cachedServers.add(location.getServerName());
|
oldLocation = tableLocations.putIfAbsent(startKey, location);
|
||||||
oldLocation = tableLocations.get(startKey);
|
|
||||||
isNewCacheEntry = (oldLocation == null);
|
isNewCacheEntry = (oldLocation == null);
|
||||||
|
if (isNewCacheEntry){
|
||||||
|
cachedServers.add(location.getServerName());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
boolean updateCache;
|
||||||
// If the server in cache sends us a redirect, assume it's always valid.
|
// If the server in cache sends us a redirect, assume it's always valid.
|
||||||
if (!isNewCacheEntry && !oldLocation.equals(source)) {
|
if (oldLocation.equals(source)) {
|
||||||
|
updateCache = true;
|
||||||
|
} else {
|
||||||
long newLocationSeqNum = location.getSeqNum();
|
long newLocationSeqNum = location.getSeqNum();
|
||||||
// Meta record is stale - some (probably the same) server has closed the region
|
// Meta record is stale - some (probably the same) server has closed the region
|
||||||
// with later seqNum and told us about the new location.
|
// with later seqNum and told us about the new location.
|
||||||
|
@ -1482,23 +1473,11 @@ public class HConnectionManager {
|
||||||
// an additional counter on top of seqNum would be necessary to handle them all.
|
// an additional counter on top of seqNum would be necessary to handle them all.
|
||||||
boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
|
boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
|
||||||
isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
|
isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
|
||||||
|
updateCache = (!isStaleUpdate);
|
||||||
}
|
}
|
||||||
if (!isStaleUpdate) {
|
if (updateCache) {
|
||||||
tableLocations.put(startKey, location);
|
tableLocations.put(startKey, location);
|
||||||
}
|
cachedServers.add(location.getServerName());
|
||||||
}
|
|
||||||
if (isNewCacheEntry) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Cached location for " +
|
|
||||||
location.getRegionInfo().getRegionNameAsString() +
|
|
||||||
" is " + location.getHostnamePort());
|
|
||||||
}
|
|
||||||
} else if (isStaleUpdate && !location.equals(oldLocation)) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Ignoring stale location update for "
|
|
||||||
+ location.getRegionInfo().getRegionNameAsString() + ": "
|
|
||||||
+ location.getHostnamePort() + " at " + location.getSeqNum() + "; local "
|
|
||||||
+ oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2207,9 +2186,7 @@ public class HConnectionManager {
|
||||||
void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
|
void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
|
||||||
ServerName serverName, long seqNum) {
|
ServerName serverName, long seqNum) {
|
||||||
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
|
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
|
||||||
synchronized (this.cachedRegionLocations) {
|
cacheLocation(hri.getTable(), source, newHrl);
|
||||||
cacheLocation(hri.getTable(), source, newHrl);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2218,18 +2195,9 @@ public class HConnectionManager {
|
||||||
* @param source The source of the error that prompts us to invalidate cache.
|
* @param source The source of the error that prompts us to invalidate cache.
|
||||||
*/
|
*/
|
||||||
void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
|
void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
|
||||||
boolean isStaleDelete = false;
|
|
||||||
HRegionLocation oldLocation;
|
|
||||||
synchronized (this.cachedRegionLocations) {
|
synchronized (this.cachedRegionLocations) {
|
||||||
Map<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
|
ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
|
||||||
oldLocation = tableLocations.get(hri.getStartKey());
|
tableLocations.remove(hri.getStartKey(), source);
|
||||||
if (oldLocation != null) {
|
|
||||||
// Do not delete the cache entry if it's not for the same server that gave us the error.
|
|
||||||
isStaleDelete = (source != null) && !oldLocation.equals(source);
|
|
||||||
if (!isStaleDelete) {
|
|
||||||
tableLocations.remove(hri.getStartKey());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2238,21 +2206,18 @@ public class HConnectionManager {
|
||||||
if (location == null) {
|
if (location == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HRegionLocation removedLocation;
|
||||||
|
TableName tableName = location.getRegionInfo().getTable();
|
||||||
synchronized (this.cachedRegionLocations) {
|
synchronized (this.cachedRegionLocations) {
|
||||||
TableName tableName = location.getRegionInfo().getTable();
|
Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
|
||||||
Map<byte[], HRegionLocation> tableLocations =
|
removedLocation = tableLocations.remove(location.getRegionInfo().getStartKey());
|
||||||
getTableLocations(tableName);
|
}
|
||||||
if (!tableLocations.isEmpty()) {
|
if (LOG.isDebugEnabled() && removedLocation != null) {
|
||||||
// Delete if there's something in the cache for this region.
|
LOG.debug("Removed " +
|
||||||
HRegionLocation removedLocation =
|
location.getRegionInfo().getRegionNameAsString() +
|
||||||
tableLocations.remove(location.getRegionInfo().getStartKey());
|
" for tableName=" + tableName +
|
||||||
if (LOG.isDebugEnabled() && removedLocation != null) {
|
" from cache");
|
||||||
LOG.debug("Removed " +
|
|
||||||
location.getRegionInfo().getRegionNameAsString() +
|
|
||||||
" for tableName=" + tableName +
|
|
||||||
" from cache");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue