HBASE-27650 Merging empty regions corrupts meta cache (branch-2) (#5038)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
3b6d8c3394
commit
53d3e78432
|
@ -23,15 +23,12 @@ import static org.apache.hadoop.hbase.HConstants.NINES;
|
||||||
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
|
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
|
||||||
import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
||||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
|
|
||||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
|
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
|
||||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
|
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
|
||||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
|
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||||
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
|
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
|
||||||
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
|
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
|
||||||
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
|
|
||||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -47,8 +44,6 @@ import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
import org.apache.hadoop.hbase.CatalogReplicaMode;
|
||||||
|
@ -66,8 +61,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The asynchronous locator for regions other than meta.
|
* The asynchronous locator for regions other than meta.
|
||||||
*/
|
*/
|
||||||
|
@ -146,13 +139,15 @@ class AsyncNonMetaRegionLocator {
|
||||||
|
|
||||||
private static final class TableCache {
|
private static final class TableCache {
|
||||||
|
|
||||||
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
|
|
||||||
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
|
|
||||||
|
|
||||||
private final Set<LocateRequest> pendingRequests = new HashSet<>();
|
private final Set<LocateRequest> pendingRequests = new HashSet<>();
|
||||||
|
|
||||||
private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
|
private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
|
||||||
new LinkedHashMap<>();
|
new LinkedHashMap<>();
|
||||||
|
private final AsyncRegionLocationCache regionLocationCache;
|
||||||
|
|
||||||
|
public TableCache(TableName tableName) {
|
||||||
|
regionLocationCache = new AsyncRegionLocationCache(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean hasQuota(int max) {
|
public boolean hasQuota(int max) {
|
||||||
return pendingRequests.size() < max;
|
return pendingRequests.size() < max;
|
||||||
|
@ -262,76 +257,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
private TableCache getTableCache(TableName tableName) {
|
private TableCache getTableCache(TableName tableName) {
|
||||||
return computeIfAbsent(cache, tableName, TableCache::new);
|
return computeIfAbsent(cache, tableName, () -> new TableCache(tableName));
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
|
|
||||||
HRegionLocation[] locArr1 = locs1.getRegionLocations();
|
|
||||||
HRegionLocation[] locArr2 = locs2.getRegionLocations();
|
|
||||||
if (locArr1.length != locArr2.length) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (int i = 0; i < locArr1.length; i++) {
|
|
||||||
// do not need to compare region info
|
|
||||||
HRegionLocation loc1 = locArr1[i];
|
|
||||||
HRegionLocation loc2 = locArr2[i];
|
|
||||||
if (loc1 == null) {
|
|
||||||
if (loc2 != null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (loc2 == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (loc1.getSeqNum() != loc2.getSeqNum()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we successfully add the locations to cache, return the locations, otherwise return the one
|
|
||||||
// which prevents us being added. The upper layer can use this value to complete pending requests.
|
|
||||||
private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
|
|
||||||
LOG.trace("Try adding {} to cache", locs);
|
|
||||||
byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
|
|
||||||
for (;;) {
|
|
||||||
RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
|
|
||||||
if (oldLocs == null) {
|
|
||||||
return locs;
|
|
||||||
}
|
|
||||||
// check whether the regions are the same, this usually happens when table is split/merged, or
|
|
||||||
// deleted and recreated again.
|
|
||||||
RegionInfo region = locs.getRegionLocation().getRegion();
|
|
||||||
RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
|
|
||||||
if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
|
|
||||||
RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
|
|
||||||
if (isEqual(mergedLocs, oldLocs)) {
|
|
||||||
// the merged one is the same with the old one, give up
|
|
||||||
LOG.trace("Will not add {} to cache because the old value {} "
|
|
||||||
+ " is newer than us or has the same server name."
|
|
||||||
+ " Maybe it is updated before we replace it", locs, oldLocs);
|
|
||||||
return oldLocs;
|
|
||||||
}
|
|
||||||
if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
|
|
||||||
return mergedLocs;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// the region is different, here we trust the one we fetched. This maybe wrong but finally
|
|
||||||
// the upper layer can detect this and trigger removal of the wrong locations
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}',"
|
|
||||||
+ " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
|
|
||||||
}
|
|
||||||
if (tableCache.cache.replace(startKey, oldLocs, locs)) {
|
|
||||||
return locs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
|
private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
|
||||||
|
@ -343,7 +269,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
Optional<LocateRequest> toSend = Optional.empty();
|
Optional<LocateRequest> toSend = Optional.empty();
|
||||||
TableCache tableCache = getTableCache(tableName);
|
TableCache tableCache = getTableCache(tableName);
|
||||||
if (locs != null) {
|
if (locs != null) {
|
||||||
RegionLocations addedLocs = addToCache(tableCache, locs);
|
RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
|
||||||
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
||||||
synchronized (tableCache) {
|
synchronized (tableCache) {
|
||||||
tableCache.pendingRequests.remove(req);
|
tableCache.pendingRequests.remove(req);
|
||||||
|
@ -421,62 +347,24 @@ class AsyncNonMetaRegionLocator {
|
||||||
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
|
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
|
private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) {
|
||||||
int replicaId) {
|
RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId);
|
||||||
Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
|
if (locs == null) {
|
||||||
if (entry == null) {
|
|
||||||
recordCacheMiss();
|
recordCacheMiss();
|
||||||
return null;
|
|
||||||
}
|
|
||||||
RegionLocations locs = entry.getValue();
|
|
||||||
HRegionLocation loc = locs.getRegionLocation(replicaId);
|
|
||||||
if (loc == null) {
|
|
||||||
recordCacheMiss();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
byte[] endKey = loc.getRegion().getEndKey();
|
|
||||||
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
|
|
||||||
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
|
|
||||||
}
|
|
||||||
recordCacheHit();
|
|
||||||
return locs;
|
|
||||||
} else {
|
} else {
|
||||||
recordCacheMiss();
|
recordCacheHit();
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
return locs;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
|
private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) {
|
||||||
byte[] row, int replicaId) {
|
RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId);
|
||||||
boolean isEmptyStopRow = isEmptyStopRow(row);
|
if (locs == null) {
|
||||||
Map.Entry<byte[], RegionLocations> entry =
|
|
||||||
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
|
|
||||||
if (entry == null) {
|
|
||||||
recordCacheMiss();
|
recordCacheMiss();
|
||||||
return null;
|
|
||||||
}
|
|
||||||
RegionLocations locs = entry.getValue();
|
|
||||||
HRegionLocation loc = locs.getRegionLocation(replicaId);
|
|
||||||
if (loc == null) {
|
|
||||||
recordCacheMiss();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (
|
|
||||||
isEmptyStopRow(loc.getRegion().getEndKey())
|
|
||||||
|| (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
|
|
||||||
) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
|
|
||||||
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
|
|
||||||
}
|
|
||||||
recordCacheHit();
|
|
||||||
return locs;
|
|
||||||
} else {
|
} else {
|
||||||
recordCacheMiss();
|
recordCacheHit();
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
return locs;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void locateInMeta(TableName tableName, LocateRequest req) {
|
private void locateInMeta(TableName tableName, LocateRequest req) {
|
||||||
|
@ -570,7 +458,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
if (info == null || info.isOffline() || info.isSplitParent()) {
|
if (info == null || info.isOffline() || info.isSplitParent()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
RegionLocations addedLocs = addToCache(tableCache, locs);
|
RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
|
||||||
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
||||||
synchronized (tableCache) {
|
synchronized (tableCache) {
|
||||||
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
|
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
|
||||||
|
@ -582,11 +470,11 @@ class AsyncNonMetaRegionLocator {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
|
private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId,
|
||||||
int replicaId, RegionLocateType locateType) {
|
RegionLocateType locateType) {
|
||||||
return locateType.equals(RegionLocateType.BEFORE)
|
return locateType.equals(RegionLocateType.BEFORE)
|
||||||
? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
|
? locateRowBeforeInCache(tableCache, row, replicaId)
|
||||||
: locateRowInCache(tableCache, tableName, row, replicaId);
|
: locateRowInCache(tableCache, row, replicaId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// locateToPrevious is true means we will use the start key of a region to locate the region
|
// locateToPrevious is true means we will use the start key of a region to locate the region
|
||||||
|
@ -598,7 +486,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
assert !locateType.equals(RegionLocateType.AFTER);
|
assert !locateType.equals(RegionLocateType.AFTER);
|
||||||
TableCache tableCache = getTableCache(tableName);
|
TableCache tableCache = getTableCache(tableName);
|
||||||
if (!reload) {
|
if (!reload) {
|
||||||
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
|
RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
|
||||||
if (isGood(locs, replicaId)) {
|
if (isGood(locs, replicaId)) {
|
||||||
return CompletableFuture.completedFuture(locs);
|
return CompletableFuture.completedFuture(locs);
|
||||||
}
|
}
|
||||||
|
@ -609,7 +497,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
synchronized (tableCache) {
|
synchronized (tableCache) {
|
||||||
// check again
|
// check again
|
||||||
if (!reload) {
|
if (!reload) {
|
||||||
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
|
RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
|
||||||
if (isGood(locs, replicaId)) {
|
if (isGood(locs, replicaId)) {
|
||||||
return CompletableFuture.completedFuture(locs);
|
return CompletableFuture.completedFuture(locs);
|
||||||
}
|
}
|
||||||
|
@ -648,43 +536,25 @@ class AsyncNonMetaRegionLocator {
|
||||||
|
|
||||||
private void removeLocationFromCache(HRegionLocation loc) {
|
private void removeLocationFromCache(HRegionLocation loc) {
|
||||||
TableCache tableCache = cache.get(loc.getRegion().getTable());
|
TableCache tableCache = cache.get(loc.getRegion().getTable());
|
||||||
if (tableCache == null) {
|
if (tableCache != null) {
|
||||||
return;
|
if (tableCache.regionLocationCache.remove(loc)) {
|
||||||
|
recordClearRegionCache();
|
||||||
|
updateMetaReplicaSelector(loc);
|
||||||
}
|
}
|
||||||
byte[] startKey = loc.getRegion().getStartKey();
|
|
||||||
for (;;) {
|
|
||||||
RegionLocations oldLocs = tableCache.cache.get(startKey);
|
|
||||||
if (oldLocs == null) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
|
|
||||||
if (!canUpdateOnError(loc, oldLoc)) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateMetaReplicaSelector(HRegionLocation loc) {
|
||||||
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
|
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
|
||||||
// with timestamp internally. Next time the client looks up the same location,
|
// with timestamp internally. Next time the client looks up the same location,
|
||||||
// it will pick a different meta replica region.
|
// it will pick a different meta replica region.
|
||||||
if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
|
if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
|
||||||
metaReplicaSelector.onError(loc);
|
metaReplicaSelector.onError(loc);
|
||||||
}
|
}
|
||||||
|
|
||||||
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
|
|
||||||
if (newLocs == null) {
|
|
||||||
if (tableCache.cache.remove(startKey, oldLocs)) {
|
|
||||||
recordClearRegionCache();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
|
|
||||||
recordClearRegionCache();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void addLocationToCache(HRegionLocation loc) {
|
void addLocationToCache(HRegionLocation loc) {
|
||||||
addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
|
getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc));
|
||||||
}
|
}
|
||||||
|
|
||||||
private HRegionLocation getCachedLocation(HRegionLocation loc) {
|
private HRegionLocation getCachedLocation(HRegionLocation loc) {
|
||||||
|
@ -692,7 +562,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
if (tableCache == null) {
|
if (tableCache == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
|
RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey());
|
||||||
return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
|
return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,8 +587,8 @@ class AsyncNonMetaRegionLocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
futureResultList.forEach(RegionLocationsFutureResult::complete);
|
futureResultList.forEach(RegionLocationsFutureResult::complete);
|
||||||
conn.getConnectionMetrics()
|
conn.getConnectionMetrics().ifPresent(
|
||||||
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
|
metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearCache() {
|
void clearCache() {
|
||||||
|
@ -727,19 +597,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
|
|
||||||
void clearCache(ServerName serverName) {
|
void clearCache(ServerName serverName) {
|
||||||
for (TableCache tableCache : cache.values()) {
|
for (TableCache tableCache : cache.values()) {
|
||||||
for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
|
tableCache.regionLocationCache.removeForServer(serverName);
|
||||||
byte[] regionName = entry.getKey();
|
|
||||||
RegionLocations locs = entry.getValue();
|
|
||||||
RegionLocations newLocs = locs.removeByServer(serverName);
|
|
||||||
if (locs == newLocs) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (newLocs.isEmpty()) {
|
|
||||||
tableCache.cache.remove(regionName, locs);
|
|
||||||
} else {
|
|
||||||
tableCache.cache.replace(regionName, locs, newLocs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -749,6 +607,7 @@ class AsyncNonMetaRegionLocator {
|
||||||
if (tableCache == null) {
|
if (tableCache == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
|
return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,268 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
|
||||||
|
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||||
|
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache of RegionLocations for use by {@link AsyncNonMetaRegionLocator}. Wrapper around
|
||||||
|
* ConcurrentSkipListMap ensuring proper access to cached items. Updates are synchronized, but reads
|
||||||
|
* are not.
|
||||||
|
*/
|
||||||
|
final class AsyncRegionLocationCache {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocationCache.class);
|
||||||
|
|
||||||
|
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
|
||||||
|
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
|
||||||
|
private final TableName tableName;
|
||||||
|
|
||||||
|
public AsyncRegionLocationCache(TableName tableName) {
|
||||||
|
this.tableName = tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the given locations to the cache, merging with existing if necessary. Also cleans out any
|
||||||
|
* previously cached locations which may have been superseded by this one (i.e. in case of merged
|
||||||
|
* regions). See {@link MetaCacheUtil} cleanProblematicOverlappedRegions
|
||||||
|
* @param locs the locations to cache
|
||||||
|
* @return the final location (possibly merged) that was added to the cache
|
||||||
|
*/
|
||||||
|
public synchronized RegionLocations add(RegionLocations locs) {
|
||||||
|
byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
|
||||||
|
RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
|
||||||
|
if (oldLocs == null) {
|
||||||
|
MetaCacheUtil.cleanProblematicOverlappedRegions(locs, cache);
|
||||||
|
return locs;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check whether the regions are the same, this usually happens when table is split/merged,
|
||||||
|
// or deleted and recreated again.
|
||||||
|
RegionInfo region = locs.getRegionLocation().getRegion();
|
||||||
|
RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
|
||||||
|
if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
|
||||||
|
RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
|
||||||
|
if (isEqual(mergedLocs, oldLocs)) {
|
||||||
|
// the merged one is the same with the old one, give up
|
||||||
|
LOG.trace("Will not add {} to cache because the old value {} "
|
||||||
|
+ " is newer than us or has the same server name."
|
||||||
|
+ " Maybe it is updated before we replace it", locs, oldLocs);
|
||||||
|
return oldLocs;
|
||||||
|
}
|
||||||
|
locs = mergedLocs;
|
||||||
|
} else {
|
||||||
|
// the region is different, here we trust the one we fetched. This maybe wrong but finally
|
||||||
|
// the upper layer can detect this and trigger removal of the wrong locations
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("The newly fetch region {} is different from the old one {} for row '{}',"
|
||||||
|
+ " try replaying the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.put(startKey, locs);
|
||||||
|
MetaCacheUtil.cleanProblematicOverlappedRegions(locs, cache);
|
||||||
|
return locs;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
|
||||||
|
HRegionLocation[] locArr1 = locs1.getRegionLocations();
|
||||||
|
HRegionLocation[] locArr2 = locs2.getRegionLocations();
|
||||||
|
if (locArr1.length != locArr2.length) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < locArr1.length; i++) {
|
||||||
|
// do not need to compare region info
|
||||||
|
HRegionLocation loc1 = locArr1[i];
|
||||||
|
HRegionLocation loc2 = locArr2[i];
|
||||||
|
if (loc1 == null) {
|
||||||
|
if (loc2 != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (loc2 == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (loc1.getSeqNum() != loc2.getSeqNum()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all cached RegionLocations
|
||||||
|
*/
|
||||||
|
public Collection<RegionLocations> getAll() {
|
||||||
|
return Collections.unmodifiableCollection(cache.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the RegionLocations for a given region's startKey. This is a direct lookup, if the key
|
||||||
|
* does not exist in the cache it will return null.
|
||||||
|
* @param startKey region start key to directly look up
|
||||||
|
*/
|
||||||
|
public RegionLocations get(byte[] startKey) {
|
||||||
|
return cache.get(startKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds the RegionLocations for the region with the greatest startKey less than or equal to the
|
||||||
|
* given row
|
||||||
|
* @param row row to find locations
|
||||||
|
*/
|
||||||
|
public RegionLocations findForRow(byte[] row, int replicaId) {
|
||||||
|
Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
|
||||||
|
if (entry == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
RegionLocations locs = entry.getValue();
|
||||||
|
if (locs == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
HRegionLocation loc = locs.getRegionLocation(replicaId);
|
||||||
|
if (loc == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
byte[] endKey = loc.getRegion().getEndKey();
|
||||||
|
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
|
||||||
|
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
|
||||||
|
}
|
||||||
|
return locs;
|
||||||
|
} else {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Requested row {} comes after region end key of {} for cached location {}",
|
||||||
|
Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), locs);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds the RegionLocations for the region with the greatest startKey strictly less than the
|
||||||
|
* given row
|
||||||
|
* @param row row to find locations
|
||||||
|
*/
|
||||||
|
public RegionLocations findForBeforeRow(byte[] row, int replicaId) {
|
||||||
|
boolean isEmptyStopRow = isEmptyStopRow(row);
|
||||||
|
Map.Entry<byte[], RegionLocations> entry =
|
||||||
|
isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
|
||||||
|
if (entry == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
RegionLocations locs = entry.getValue();
|
||||||
|
if (locs == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
HRegionLocation loc = locs.getRegionLocation(replicaId);
|
||||||
|
if (loc == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
isEmptyStopRow(loc.getRegion().getEndKey())
|
||||||
|
|| (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
|
||||||
|
) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
|
||||||
|
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
|
||||||
|
}
|
||||||
|
return locs;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the location from the cache if it exists and can be removed.
|
||||||
|
* @return true if entry was removed
|
||||||
|
*/
|
||||||
|
public synchronized boolean remove(HRegionLocation loc) {
|
||||||
|
byte[] startKey = loc.getRegion().getStartKey();
|
||||||
|
RegionLocations oldLocs = cache.get(startKey);
|
||||||
|
if (oldLocs == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
|
||||||
|
if (!canUpdateOnError(loc, oldLoc)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
|
||||||
|
if (newLocs == null) {
|
||||||
|
if (cache.remove(startKey, oldLocs)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cache.put(startKey, newLocs);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the size of the region locations cache
|
||||||
|
*/
|
||||||
|
public int size() {
|
||||||
|
return cache.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes serverName from all locations in the cache, fully removing any RegionLocations which
|
||||||
|
* are empty after removing the server from it.
|
||||||
|
* @param serverName server to remove from locations
|
||||||
|
*/
|
||||||
|
public synchronized void removeForServer(ServerName serverName) {
|
||||||
|
for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
|
||||||
|
byte[] regionName = entry.getKey();
|
||||||
|
RegionLocations locs = entry.getValue();
|
||||||
|
RegionLocations newLocs = locs.removeByServer(serverName);
|
||||||
|
if (locs == newLocs) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (newLocs.isEmpty()) {
|
||||||
|
cache.remove(regionName, locs);
|
||||||
|
} else {
|
||||||
|
cache.put(regionName, newLocs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -112,12 +112,6 @@ public interface ClusterConnection extends Connection {
|
||||||
*/
|
*/
|
||||||
void clearRegionCache(final TableName tableName);
|
void clearRegionCache(final TableName tableName);
|
||||||
|
|
||||||
/**
|
|
||||||
* Deletes cached locations for the specific region.
|
|
||||||
* @param location The location object for the region, to be purged from cache.
|
|
||||||
*/
|
|
||||||
void deleteCachedRegionLocation(final HRegionLocation location);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find the location of the region of <i>tableName</i> that <i>row</i> lives in, ignoring any
|
* Find the location of the region of <i>tableName</i> that <i>row</i> lives in, ignoring any
|
||||||
* value that might be in the cache.
|
* value that might be in the cache.
|
||||||
|
|
|
@ -2066,11 +2066,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
cacheLocation(hri.getTable(), source, newHrl);
|
cacheLocation(hri.getTable(), source, newHrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteCachedRegionLocation(final HRegionLocation location) {
|
|
||||||
metaCache.clearCache(location);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the location with the new value (if the exception is a RegionMovedException) or delete
|
* Update the location with the new value (if the exception is a RegionMovedException) or delete
|
||||||
* it from the cache. Does nothing if we can be sure from the exception that the location is still
|
* it from the cache. Does nothing if we can be sure from the exception that the location is still
|
||||||
|
|
|
@ -45,7 +45,9 @@ public class MetaCache {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MetaCache.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MetaCache.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map of table to table {@link HRegionLocation}s.
|
* Map of table to table {@link HRegionLocation}s. <br>
|
||||||
|
* Despite being Concurrent, writes to the map should be synchronized because we have cases where
|
||||||
|
* we need to make multiple updates atomically.
|
||||||
*/
|
*/
|
||||||
private final ConcurrentMap<TableName,
|
private final ConcurrentMap<TableName,
|
||||||
ConcurrentNavigableMap<byte[], RegionLocations>> cachedRegionLocations =
|
ConcurrentNavigableMap<byte[], RegionLocations>> cachedRegionLocations =
|
||||||
|
@ -101,22 +103,28 @@ public class MetaCache {
|
||||||
return possibleRegion;
|
return possibleRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Requested row {} comes after region end key of {} for cached location {}",
|
||||||
|
Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), possibleRegion);
|
||||||
|
}
|
||||||
// Passed all the way through, so we got nothing - complete cache miss
|
// Passed all the way through, so we got nothing - complete cache miss
|
||||||
if (metrics != null) metrics.incrMetaCacheMiss();
|
if (metrics != null) metrics.incrMetaCacheMiss();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Put a newly discovered HRegionLocation into the cache.
|
* Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to
|
||||||
|
* make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be
|
||||||
|
* atomic.
|
||||||
* @param tableName The table name.
|
* @param tableName The table name.
|
||||||
* @param source the source of the new location
|
* @param source the source of the new location
|
||||||
* @param location the new location
|
* @param location the new location
|
||||||
*/
|
*/
|
||||||
public void cacheLocation(final TableName tableName, final ServerName source,
|
public synchronized void cacheLocation(final TableName tableName, final ServerName source,
|
||||||
final HRegionLocation location) {
|
final HRegionLocation location) {
|
||||||
assert source != null;
|
assert source != null;
|
||||||
byte[] startKey = location.getRegion().getStartKey();
|
byte[] startKey = location.getRegion().getStartKey();
|
||||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
||||||
RegionLocations locations = new RegionLocations(new HRegionLocation[] { location });
|
RegionLocations locations = new RegionLocations(new HRegionLocation[] { location });
|
||||||
RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
|
RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
|
||||||
boolean isNewCacheEntry = (oldLocations == null);
|
boolean isNewCacheEntry = (oldLocations == null);
|
||||||
|
@ -125,6 +133,7 @@ public class MetaCache {
|
||||||
LOG.trace("Cached location: " + location);
|
LOG.trace("Cached location: " + location);
|
||||||
}
|
}
|
||||||
addToCachedServers(locations);
|
addToCachedServers(locations);
|
||||||
|
MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,8 +150,9 @@ public class MetaCache {
|
||||||
// an additional counter on top of seqNum would be necessary to handle them all.
|
// an additional counter on top of seqNum would be necessary to handle them all.
|
||||||
RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
|
RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
|
||||||
if (oldLocations != updatedLocations) {
|
if (oldLocations != updatedLocations) {
|
||||||
boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
|
tableLocations.put(startKey, updatedLocations);
|
||||||
if (replaced && LOG.isTraceEnabled()) {
|
MetaCacheUtil.cleanProblematicOverlappedRegions(updatedLocations, tableLocations);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Changed cached location to: " + location);
|
LOG.trace("Changed cached location to: " + location);
|
||||||
}
|
}
|
||||||
addToCachedServers(updatedLocations);
|
addToCachedServers(updatedLocations);
|
||||||
|
@ -150,13 +160,16 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Put a newly discovered HRegionLocation into the cache.
|
* Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to
|
||||||
|
* make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be
|
||||||
|
* atomic.
|
||||||
* @param tableName The table name.
|
* @param tableName The table name.
|
||||||
* @param locations the new locations
|
* @param locations the new locations
|
||||||
*/
|
*/
|
||||||
public void cacheLocation(final TableName tableName, final RegionLocations locations) {
|
public synchronized void cacheLocation(final TableName tableName,
|
||||||
|
final RegionLocations locations) {
|
||||||
byte[] startKey = locations.getRegionLocation().getRegion().getStartKey();
|
byte[] startKey = locations.getRegionLocation().getRegion().getStartKey();
|
||||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
ConcurrentNavigableMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
||||||
RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
|
RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
|
||||||
boolean isNewCacheEntry = (oldLocation == null);
|
boolean isNewCacheEntry = (oldLocation == null);
|
||||||
if (isNewCacheEntry) {
|
if (isNewCacheEntry) {
|
||||||
|
@ -164,6 +177,7 @@ public class MetaCache {
|
||||||
LOG.trace("Cached location: " + locations);
|
LOG.trace("Cached location: " + locations);
|
||||||
}
|
}
|
||||||
addToCachedServers(locations);
|
addToCachedServers(locations);
|
||||||
|
MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,8 +185,9 @@ public class MetaCache {
|
||||||
// Meta record might be stale - some (probably the same) server has closed the region
|
// Meta record might be stale - some (probably the same) server has closed the region
|
||||||
// with later seqNum and told us about the new location.
|
// with later seqNum and told us about the new location.
|
||||||
RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
|
RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
|
||||||
boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
|
tableLocations.put(startKey, mergedLocation);
|
||||||
if (replaced && LOG.isTraceEnabled()) {
|
MetaCacheUtil.cleanProblematicOverlappedRegions(mergedLocation, tableLocations);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Merged cached locations: " + mergedLocation);
|
LOG.trace("Merged cached locations: " + mergedLocation);
|
||||||
}
|
}
|
||||||
addToCachedServers(locations);
|
addToCachedServers(locations);
|
||||||
|
@ -186,7 +201,11 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns Map of cached locations for passed <code>tableName</code> */
|
/**
|
||||||
|
* Returns Map of cached locations for passed <code>tableName</code>.<br>
|
||||||
|
* Despite being Concurrent, writes to the map should be synchronized because we have cases where
|
||||||
|
* we need to make multiple updates atomically.
|
||||||
|
*/
|
||||||
private ConcurrentNavigableMap<byte[], RegionLocations>
|
private ConcurrentNavigableMap<byte[], RegionLocations>
|
||||||
getTableLocations(final TableName tableName) {
|
getTableLocations(final TableName tableName) {
|
||||||
// find the map of cached locations for this table
|
// find the map of cached locations for this table
|
||||||
|
@ -221,23 +240,22 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete all cached entries.
|
* Delete all cached entries. <br>
|
||||||
|
* Synchronized because of calls in cacheLocation which need to be executed atomically
|
||||||
*/
|
*/
|
||||||
public void clearCache() {
|
public synchronized void clearCache() {
|
||||||
this.cachedRegionLocations.clear();
|
this.cachedRegionLocations.clear();
|
||||||
this.cachedServers.clear();
|
this.cachedServers.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete all cached entries of a server.
|
* Delete all cached entries of a server. <br>
|
||||||
|
* Synchronized because of calls in cacheLocation which need to be executed atomically
|
||||||
*/
|
*/
|
||||||
public void clearCache(final ServerName serverName) {
|
public synchronized void clearCache(final ServerName serverName) {
|
||||||
if (!this.cachedServers.contains(serverName)) {
|
// Prior to synchronizing this method, we used to do another check below while synchronizing
|
||||||
return;
|
// on cachedServers. This is no longer necessary since we moved synchronization up.
|
||||||
}
|
// Prior reason:
|
||||||
|
|
||||||
boolean deletedSomething = false;
|
|
||||||
synchronized (this.cachedServers) {
|
|
||||||
// We block here, because if there is an error on a server, it's likely that multiple
|
// We block here, because if there is an error on a server, it's likely that multiple
|
||||||
// threads will get the error simultaneously. If there are hundreds of thousand of
|
// threads will get the error simultaneously. If there are hundreds of thousand of
|
||||||
// region location to check, it's better to do this only once. A better pattern would
|
// region location to check, it's better to do this only once. A better pattern would
|
||||||
|
@ -245,24 +263,25 @@ public class MetaCache {
|
||||||
if (!this.cachedServers.contains(serverName)) {
|
if (!this.cachedServers.contains(serverName)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean deletedSomething = false;
|
||||||
for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()) {
|
for (ConcurrentMap<byte[], RegionLocations> tableLocations : cachedRegionLocations.values()) {
|
||||||
for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
|
for (Entry<byte[], RegionLocations> e : tableLocations.entrySet()) {
|
||||||
RegionLocations regionLocations = e.getValue();
|
RegionLocations regionLocations = e.getValue();
|
||||||
if (regionLocations != null) {
|
if (regionLocations != null) {
|
||||||
RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
|
RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
|
||||||
if (updatedLocations != regionLocations) {
|
if (updatedLocations != regionLocations) {
|
||||||
|
deletedSomething = true;
|
||||||
if (updatedLocations.isEmpty()) {
|
if (updatedLocations.isEmpty()) {
|
||||||
deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
|
tableLocations.remove(e.getKey());
|
||||||
} else {
|
} else {
|
||||||
deletedSomething |=
|
tableLocations.put(e.getKey(), updatedLocations);
|
||||||
tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.cachedServers.remove(serverName);
|
this.cachedServers.remove(serverName);
|
||||||
}
|
|
||||||
if (deletedSomething) {
|
if (deletedSomething) {
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.incrMetaCacheNumClearServer();
|
metrics.incrMetaCacheNumClearServer();
|
||||||
|
@ -274,27 +293,17 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete all cached entries of a table.
|
* Delete a cached location, no matter what it is. Called when we were told to not use cache.<br>
|
||||||
*/
|
* Synchronized because of calls in cacheLocation which need to be executed atomically
|
||||||
public void clearCache(final TableName tableName) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Removed all cached region locations for table " + tableName);
|
|
||||||
}
|
|
||||||
this.cachedRegionLocations.remove(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delete a cached location, no matter what it is. Called when we were told to not use cache.
|
|
||||||
* @param tableName tableName
|
* @param tableName tableName
|
||||||
*/
|
*/
|
||||||
public void clearCache(final TableName tableName, final byte[] row) {
|
public synchronized void clearCache(final TableName tableName, final byte[] row) {
|
||||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
||||||
|
|
||||||
RegionLocations regionLocations = getCachedLocation(tableName, row);
|
RegionLocations regionLocations = getCachedLocation(tableName, row);
|
||||||
if (regionLocations != null) {
|
if (regionLocations != null) {
|
||||||
byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
|
byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
|
||||||
boolean removed = tableLocations.remove(startKey, regionLocations);
|
tableLocations.remove(startKey);
|
||||||
if (removed) {
|
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.incrMetaCacheNumClearRegion();
|
metrics.incrMetaCacheNumClearRegion();
|
||||||
}
|
}
|
||||||
|
@ -303,15 +312,26 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete all cached entries of a table.<br>
|
||||||
|
* Synchronized because of calls in cacheLocation which need to be executed atomically
|
||||||
|
*/
|
||||||
|
public synchronized void clearCache(final TableName tableName) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Removed all cached region locations for table " + tableName);
|
||||||
|
}
|
||||||
|
this.cachedRegionLocations.remove(tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete a cached location with specific replicaId.
|
* Delete a cached location with specific replicaId.<br>
|
||||||
|
* Synchronized because of calls in cacheLocation which need to be executed atomically
|
||||||
* @param tableName tableName
|
* @param tableName tableName
|
||||||
* @param row row key
|
* @param row row key
|
||||||
* @param replicaId region replica id
|
* @param replicaId region replica id
|
||||||
*/
|
*/
|
||||||
public void clearCache(final TableName tableName, final byte[] row, int replicaId) {
|
public synchronized void clearCache(final TableName tableName, final byte[] row, int replicaId) {
|
||||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
||||||
|
|
||||||
RegionLocations regionLocations = getCachedLocation(tableName, row);
|
RegionLocations regionLocations = getCachedLocation(tableName, row);
|
||||||
|
@ -320,14 +340,12 @@ public class MetaCache {
|
||||||
if (toBeRemoved != null) {
|
if (toBeRemoved != null) {
|
||||||
RegionLocations updatedLocations = regionLocations.remove(replicaId);
|
RegionLocations updatedLocations = regionLocations.remove(replicaId);
|
||||||
byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
|
byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
|
||||||
boolean removed;
|
|
||||||
if (updatedLocations.isEmpty()) {
|
if (updatedLocations.isEmpty()) {
|
||||||
removed = tableLocations.remove(startKey, regionLocations);
|
tableLocations.remove(startKey);
|
||||||
} else {
|
} else {
|
||||||
removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
|
tableLocations.put(startKey, updatedLocations);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (removed) {
|
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.incrMetaCacheNumClearRegion();
|
metrics.incrMetaCacheNumClearRegion();
|
||||||
}
|
}
|
||||||
|
@ -337,12 +355,13 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete a cached location for a table, row and server
|
* Delete a cached location for a table, row and server. <br>
|
||||||
|
* Synchronized because of calls in cacheLocation which need to be executed atomically
|
||||||
*/
|
*/
|
||||||
public void clearCache(final TableName tableName, final byte[] row, ServerName serverName) {
|
public synchronized void clearCache(final TableName tableName, final byte[] row,
|
||||||
|
ServerName serverName) {
|
||||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
||||||
|
|
||||||
RegionLocations regionLocations = getCachedLocation(tableName, row);
|
RegionLocations regionLocations = getCachedLocation(tableName, row);
|
||||||
|
@ -350,13 +369,11 @@ public class MetaCache {
|
||||||
RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
|
RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
|
||||||
if (updatedLocations != regionLocations) {
|
if (updatedLocations != regionLocations) {
|
||||||
byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
|
byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey();
|
||||||
boolean removed = false;
|
|
||||||
if (updatedLocations.isEmpty()) {
|
if (updatedLocations.isEmpty()) {
|
||||||
removed = tableLocations.remove(startKey, regionLocations);
|
tableLocations.remove(startKey);
|
||||||
} else {
|
} else {
|
||||||
removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
|
tableLocations.put(startKey, updatedLocations);
|
||||||
}
|
}
|
||||||
if (removed) {
|
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.incrMetaCacheNumClearRegion();
|
metrics.incrMetaCacheNumClearRegion();
|
||||||
}
|
}
|
||||||
|
@ -367,27 +384,25 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes the cached location of the region if necessary, based on some error from source.
|
* Deletes the cached location of the region if necessary, based on some error from source.<br>
|
||||||
|
* Synchronized because of calls in cacheLocation which need to be executed atomically
|
||||||
* @param hri The region in question.
|
* @param hri The region in question.
|
||||||
*/
|
*/
|
||||||
public void clearCache(RegionInfo hri) {
|
public synchronized void clearCache(RegionInfo hri) {
|
||||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
|
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(hri.getTable());
|
||||||
RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
|
RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
|
||||||
if (regionLocations != null) {
|
if (regionLocations != null) {
|
||||||
HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
|
HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
|
||||||
if (oldLocation == null) return;
|
if (oldLocation == null) return;
|
||||||
RegionLocations updatedLocations = regionLocations.remove(oldLocation);
|
RegionLocations updatedLocations = regionLocations.remove(oldLocation);
|
||||||
boolean removed;
|
|
||||||
if (updatedLocations != regionLocations) {
|
if (updatedLocations != regionLocations) {
|
||||||
if (updatedLocations.isEmpty()) {
|
if (updatedLocations.isEmpty()) {
|
||||||
removed = tableLocations.remove(hri.getStartKey(), regionLocations);
|
tableLocations.remove(hri.getStartKey());
|
||||||
} else {
|
} else {
|
||||||
removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
|
tableLocations.put(hri.getStartKey(), updatedLocations);
|
||||||
}
|
}
|
||||||
if (removed) {
|
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.incrMetaCacheNumClearRegion();
|
metrics.incrMetaCacheNumClearRegion();
|
||||||
}
|
}
|
||||||
|
@ -397,34 +412,5 @@ public class MetaCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public void clearCache(final HRegionLocation location) {
|
|
||||||
if (location == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
TableName tableName = location.getRegion().getTable();
|
|
||||||
ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
|
|
||||||
RegionLocations regionLocations = tableLocations.get(location.getRegion().getStartKey());
|
|
||||||
if (regionLocations != null) {
|
|
||||||
RegionLocations updatedLocations = regionLocations.remove(location);
|
|
||||||
boolean removed;
|
|
||||||
if (updatedLocations != regionLocations) {
|
|
||||||
if (updatedLocations.isEmpty()) {
|
|
||||||
removed = tableLocations.remove(location.getRegion().getStartKey(), regionLocations);
|
|
||||||
} else {
|
|
||||||
removed = tableLocations.replace(location.getRegion().getStartKey(), regionLocations,
|
|
||||||
updatedLocations);
|
|
||||||
}
|
|
||||||
if (removed) {
|
|
||||||
if (metrics != null) {
|
|
||||||
metrics.incrMetaCacheNumClearRegion();
|
|
||||||
}
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Removed " + location + " from cache");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Util class to DRY common logic between AsyncRegionLocationCache and MetaCache
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
final class MetaCacheUtil {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MetaCacheUtil.class);
|
||||||
|
|
||||||
|
private MetaCacheUtil() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When caching a location, the region may have been the result of a merge. Check to see if the
|
||||||
|
* region's boundaries overlap any other cached locations in a problematic way. Those would have
|
||||||
|
* been merge parents which no longer exist. We need to proactively clear them out to avoid a case
|
||||||
|
* where a merged region which receives no requests never gets cleared. This causes requests to
|
||||||
|
* other merged regions after it to see the wrong cached location.
|
||||||
|
* <p>
|
||||||
|
* For example, if we have Start_New < Start_Old < End_Old < End_New, then if we only access
|
||||||
|
* within range [End_Old, End_New], then it will always return the old region but it will then
|
||||||
|
* find out the row is not in the range, and try to get the new region, and then we get
|
||||||
|
* [Start_New, End_New), still fall into the same situation.
|
||||||
|
* <p>
|
||||||
|
* If Start_Old is less than Start_New, even if we have overlap, it is not a problem, as when the
|
||||||
|
* row is greater than Start_New, we will locate to the new region, and if the row is less than
|
||||||
|
* Start_New, it will fall into the old region's range and we will try to access the region and
|
||||||
|
* get a NotServing exception, and then we will clean the cache.
|
||||||
|
* <p>
|
||||||
|
* See HBASE-27650
|
||||||
|
* @param locations the new location that was just cached
|
||||||
|
*/
|
||||||
|
static void cleanProblematicOverlappedRegions(RegionLocations locations,
|
||||||
|
ConcurrentNavigableMap<byte[], RegionLocations> cache) {
|
||||||
|
RegionInfo region = locations.getRegionLocation().getRegion();
|
||||||
|
|
||||||
|
boolean isLast = isEmptyStopRow(region.getEndKey());
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
Map.Entry<byte[], RegionLocations> overlap =
|
||||||
|
isLast ? cache.lastEntry() : cache.lowerEntry(region.getEndKey());
|
||||||
|
if (
|
||||||
|
overlap == null || overlap.getValue() == locations
|
||||||
|
|| Bytes.equals(overlap.getKey(), region.getStartKey())
|
||||||
|
) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Removing cached location {} (endKey={}) because it overlaps with "
|
||||||
|
+ "new location {} (endKey={})",
|
||||||
|
overlap.getValue(),
|
||||||
|
Bytes.toStringBinary(overlap.getValue().getRegionLocation().getRegion().getEndKey()),
|
||||||
|
locations, Bytes.toStringBinary(locations.getRegionLocation().getRegion().getEndKey()));
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.remove(overlap.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -558,6 +558,10 @@ public final class MetricsConnection implements StatisticTrackable {
|
||||||
metaCacheMisses.inc();
|
metaCacheMisses.inc();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getMetaCacheMisses() {
|
||||||
|
return metaCacheMisses.getCount();
|
||||||
|
}
|
||||||
|
|
||||||
/** Increment the number of meta cache drops requested for entire RegionServer. */
|
/** Increment the number of meta cache drops requested for entire RegionServer. */
|
||||||
public void incrMetaCacheNumClearServer() {
|
public void incrMetaCacheNumClearServer() {
|
||||||
metaCacheNumClearServer.inc();
|
metaCacheNumClearServer.inc();
|
||||||
|
|
|
@ -76,7 +76,8 @@ public class TestAsyncTableLocatePrefetch {
|
||||||
// confirm that the locations of all the regions have been cached.
|
// confirm that the locations of all the regions have been cached.
|
||||||
assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, Bytes.toBytes("aaa")));
|
assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, Bytes.toBytes("aaa")));
|
||||||
for (byte[] row : HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE) {
|
for (byte[] row : HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE) {
|
||||||
assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, row));
|
assertNotNull("Expected location to not be null for " + Bytes.toStringBinary(row),
|
||||||
|
LOCATOR.getRegionLocationInCache(TABLE_NAME, row));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,10 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -47,10 +51,12 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.function.ThrowingRunnable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -102,6 +108,128 @@ public class TestMetaCache {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMergeEmptyWithMetaCache() throws Throwable {
|
||||||
|
TableName tableName = TableName.valueOf("MergeEmpty");
|
||||||
|
byte[] family = Bytes.toBytes("CF");
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
|
||||||
|
TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) });
|
||||||
|
TEST_UTIL.waitTableAvailable(tableName);
|
||||||
|
TEST_UTIL.waitUntilNoRegionsInTransition();
|
||||||
|
RegionInfo regionA = null;
|
||||||
|
RegionInfo regionB = null;
|
||||||
|
RegionInfo regionC = null;
|
||||||
|
for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) {
|
||||||
|
if (region.getStartKey().length == 0) {
|
||||||
|
regionA = region;
|
||||||
|
} else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) {
|
||||||
|
regionB = region;
|
||||||
|
} else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) {
|
||||||
|
regionC = region;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNotNull(regionA);
|
||||||
|
assertNotNull(regionB);
|
||||||
|
assertNotNull(regionC);
|
||||||
|
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY,
|
||||||
|
true);
|
||||||
|
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||||
|
AsyncConnection asyncConn =
|
||||||
|
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
|
||||||
|
ConnectionImplementation connImpl = (ConnectionImplementation) conn;
|
||||||
|
AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn;
|
||||||
|
|
||||||
|
MetricsConnection metrics = connImpl.getConnectionMetrics();
|
||||||
|
MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get();
|
||||||
|
|
||||||
|
// warm meta cache
|
||||||
|
conn.getRegionLocator(tableName).getAllRegionLocations();
|
||||||
|
asyncConn.getRegionLocator(tableName).getAllRegionLocations().get();
|
||||||
|
|
||||||
|
Assert.assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size());
|
||||||
|
|
||||||
|
// Merge the 3 regions into one
|
||||||
|
TEST_UTIL.getAdmin().mergeRegionsAsync(
|
||||||
|
new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() },
|
||||||
|
false).get(30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
Assert.assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size());
|
||||||
|
|
||||||
|
Table table = conn.getTable(tableName);
|
||||||
|
AsyncTable<?> asyncTable = asyncConn.getTable(tableName);
|
||||||
|
|
||||||
|
// This request should cause us to cache the newly merged region.
|
||||||
|
// As part of caching that region, it should clear out any cached merge parent regions which
|
||||||
|
// are overlapped by the new region. That way, subsequent calls below won't fall into the
|
||||||
|
// bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached
|
||||||
|
// regionB and we'd continue to see cache misses below.
|
||||||
|
assertTrue(executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics) > 0);
|
||||||
|
assertTrue(
|
||||||
|
executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics)
|
||||||
|
> 0);
|
||||||
|
|
||||||
|
// We verify no new cache misses here due to above, which proves we've fixed up the cache
|
||||||
|
assertEquals(0, executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics));
|
||||||
|
assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(),
|
||||||
|
asyncMetrics));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long executeAndGetNewMisses(ThrowingRunnable runnable, MetricsConnection metrics)
|
||||||
|
throws Throwable {
|
||||||
|
long lastVal = metrics.getMetaCacheMisses();
|
||||||
|
runnable.run();
|
||||||
|
long curVal = metrics.getMetaCacheMisses();
|
||||||
|
return curVal - lastVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally
|
||||||
|
* encountered when using floorEntry rather than lowerEntry.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAddToCacheReverse() throws IOException, InterruptedException, ExecutionException {
|
||||||
|
try (
|
||||||
|
AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) ConnectionFactory
|
||||||
|
.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
|
ConnectionImplementation conn = (ConnectionImplementation) ConnectionFactory
|
||||||
|
.createConnection(TEST_UTIL.getConfiguration())) {
|
||||||
|
|
||||||
|
AsyncNonMetaRegionLocator asyncLocator = asyncConn.getLocator().getNonMetaRegionLocator();
|
||||||
|
|
||||||
|
TableName tableName = TableName.valueOf("testAddToCache");
|
||||||
|
byte[] family = Bytes.toBytes("CF");
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
|
||||||
|
int maxSplits = 10;
|
||||||
|
List<byte[]> splits =
|
||||||
|
IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList());
|
||||||
|
|
||||||
|
TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][]));
|
||||||
|
TEST_UTIL.waitTableAvailable(tableName);
|
||||||
|
TEST_UTIL.waitUntilNoRegionsInTransition();
|
||||||
|
conn.getRegionLocator(tableName);
|
||||||
|
|
||||||
|
assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size());
|
||||||
|
|
||||||
|
RegionLocator locatorForTable = conn.getRegionLocator(tableName);
|
||||||
|
AsyncTableRegionLocator asyncLocatorForTable = asyncConn.getRegionLocator(tableName);
|
||||||
|
for (int i = maxSplits; i >= 0; i--) {
|
||||||
|
locatorForTable.getRegionLocation(Bytes.toBytes(i));
|
||||||
|
asyncLocatorForTable.getRegionLocation(Bytes.toBytes(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < maxSplits; i++) {
|
||||||
|
assertNotNull(asyncLocator.getRegionLocationInCache(tableName, Bytes.toBytes(i)));
|
||||||
|
assertNotNull(conn.getCachedLocation(tableName, Bytes.toBytes(i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPreserveMetaCacheOnException() throws Exception {
|
public void testPreserveMetaCacheOnException() throws Exception {
|
||||||
((FakeRSRpcServices) badRS.getRSRpcServices())
|
((FakeRSRpcServices) badRS.getRSRpcServices())
|
||||||
|
|
Loading…
Reference in New Issue