HBASE-27650 Merging empty regions corrupts meta cache (#5037)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bryan Beaudreault 2023-02-26 16:39:48 -05:00 committed by GitHub
parent 4a9cf99b2f
commit f20efafbc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 473 additions and 187 deletions

View File

@ -23,15 +23,12 @@ import static org.apache.hadoop.hbase.HConstants.NINES;
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
import static org.apache.hadoop.hbase.HConstants.ZEROES;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import java.io.IOException;
@ -47,8 +44,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.CatalogFamilyFormat;
@ -66,8 +61,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
/**
* The asynchronous locator for regions other than meta.
*/
@ -146,13 +139,15 @@ class AsyncNonMetaRegionLocator {
private static final class TableCache {
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
private final Set<LocateRequest> pendingRequests = new HashSet<>();
private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
new LinkedHashMap<>();
private final AsyncRegionLocationCache regionLocationCache;
public TableCache(TableName tableName) {
regionLocationCache = new AsyncRegionLocationCache(tableName);
}
public boolean hasQuota(int max) {
return pendingRequests.size() < max;
@ -261,76 +256,7 @@ class AsyncNonMetaRegionLocator {
}
private TableCache getTableCache(TableName tableName) {
return computeIfAbsent(cache, tableName, TableCache::new);
}
private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
HRegionLocation[] locArr1 = locs1.getRegionLocations();
HRegionLocation[] locArr2 = locs2.getRegionLocations();
if (locArr1.length != locArr2.length) {
return false;
}
for (int i = 0; i < locArr1.length; i++) {
// do not need to compare region info
HRegionLocation loc1 = locArr1[i];
HRegionLocation loc2 = locArr2[i];
if (loc1 == null) {
if (loc2 != null) {
return false;
}
} else {
if (loc2 == null) {
return false;
}
if (loc1.getSeqNum() != loc2.getSeqNum()) {
return false;
}
if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
return false;
}
}
}
return true;
}
// if we successfully add the locations to cache, return the locations, otherwise return the one
// which prevents us being added. The upper layer can use this value to complete pending requests.
private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
LOG.trace("Try adding {} to cache", locs);
byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
for (;;) {
RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
if (oldLocs == null) {
return locs;
}
// check whether the regions are the same, this usually happens when table is split/merged, or
// deleted and recreated again.
RegionInfo region = locs.getRegionLocation().getRegion();
RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
if (isEqual(mergedLocs, oldLocs)) {
// the merged one is the same with the old one, give up
LOG.trace("Will not add {} to cache because the old value {} "
+ " is newer than us or has the same server name."
+ " Maybe it is updated before we replace it", locs, oldLocs);
return oldLocs;
}
if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
return mergedLocs;
}
} else {
// the region is different, here we trust the one we fetched. This maybe wrong but finally
// the upper layer can detect this and trigger removal of the wrong locations
if (LOG.isDebugEnabled()) {
LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}',"
+ " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
}
if (tableCache.cache.replace(startKey, oldLocs, locs)) {
return locs;
}
}
}
return computeIfAbsent(cache, tableName, () -> new TableCache(tableName));
}
private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
@ -342,7 +268,7 @@ class AsyncNonMetaRegionLocator {
Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
if (locs != null) {
RegionLocations addedLocs = addToCache(tableCache, locs);
RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
@ -420,62 +346,24 @@ class AsyncNonMetaRegionLocator {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
}
private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId) {
Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
if (entry == null) {
private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) {
RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId);
if (locs == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
recordCacheMiss();
return null;
}
byte[] endKey = loc.getRegion().getEndKey();
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
recordCacheHit();
}
return locs;
}
private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
byte[] row, int replicaId) {
boolean isEmptyStopRow = isEmptyStopRow(row);
Map.Entry<byte[], RegionLocations> entry =
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
if (entry == null) {
private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) {
RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId);
if (locs == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
recordCacheMiss();
return null;
}
if (
isEmptyStopRow(loc.getRegion().getEndKey())
|| (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
recordCacheHit();
}
return locs;
}
private void locateInMeta(TableName tableName, LocateRequest req) {
@ -569,7 +457,7 @@ class AsyncNonMetaRegionLocator {
if (info == null || info.isOffline() || info.isSplitParent()) {
continue;
}
RegionLocations addedLocs = addToCache(tableCache, locs);
RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
@ -581,11 +469,11 @@ class AsyncNonMetaRegionLocator {
});
}
private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId, RegionLocateType locateType) {
private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId,
RegionLocateType locateType) {
return locateType.equals(RegionLocateType.BEFORE)
? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
: locateRowInCache(tableCache, tableName, row, replicaId);
? locateRowBeforeInCache(tableCache, row, replicaId)
: locateRowInCache(tableCache, row, replicaId);
}
// locateToPrevious is true means we will use the start key of a region to locate the region
@ -597,7 +485,7 @@ class AsyncNonMetaRegionLocator {
assert !locateType.equals(RegionLocateType.AFTER);
TableCache tableCache = getTableCache(tableName);
if (!reload) {
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
if (isGood(locs, replicaId)) {
return CompletableFuture.completedFuture(locs);
}
@ -608,7 +496,7 @@ class AsyncNonMetaRegionLocator {
synchronized (tableCache) {
// check again
if (!reload) {
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
if (isGood(locs, replicaId)) {
return CompletableFuture.completedFuture(locs);
}
@ -647,43 +535,25 @@ class AsyncNonMetaRegionLocator {
private void removeLocationFromCache(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
return;
}
byte[] startKey = loc.getRegion().getStartKey();
for (;;) {
RegionLocations oldLocs = tableCache.cache.get(startKey);
if (oldLocs == null) {
return;
}
HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
if (!canUpdateOnError(loc, oldLoc)) {
return;
}
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
// with timestamp internally. Next time the client looks up the same location,
// it will pick a different meta replica region.
if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
metaReplicaSelector.onError(loc);
}
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
if (newLocs == null) {
if (tableCache.cache.remove(startKey, oldLocs)) {
recordClearRegionCache();
return;
}
} else {
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
recordClearRegionCache();
return;
}
if (tableCache != null) {
if (tableCache.regionLocationCache.remove(loc)) {
recordClearRegionCache();
updateMetaReplicaSelector(loc);
}
}
}
private void updateMetaReplicaSelector(HRegionLocation loc) {
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
// with timestamp internally. Next time the client looks up the same location,
// it will pick a different meta replica region.
if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
metaReplicaSelector.onError(loc);
}
}
void addLocationToCache(HRegionLocation loc) {
addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc));
}
private HRegionLocation getCachedLocation(HRegionLocation loc) {
@ -691,7 +561,7 @@ class AsyncNonMetaRegionLocator {
if (tableCache == null) {
return null;
}
RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey());
return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
}
@ -716,8 +586,8 @@ class AsyncNonMetaRegionLocator {
}
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
conn.getConnectionMetrics()
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
conn.getConnectionMetrics().ifPresent(
metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size()));
}
void clearCache() {
@ -726,19 +596,7 @@ class AsyncNonMetaRegionLocator {
void clearCache(ServerName serverName) {
for (TableCache tableCache : cache.values()) {
for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
byte[] regionName = entry.getKey();
RegionLocations locs = entry.getValue();
RegionLocations newLocs = locs.removeByServer(serverName);
if (locs == newLocs) {
continue;
}
if (newLocs.isEmpty()) {
tableCache.cache.remove(regionName, locs);
} else {
tableCache.cache.replace(regionName, locs, newLocs);
}
}
tableCache.regionLocationCache.removeForServer(serverName);
}
}
@ -748,7 +606,7 @@ class AsyncNonMetaRegionLocator {
if (tableCache == null) {
return null;
}
return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
// only used for testing whether we have cached the location for a table.
@ -757,6 +615,7 @@ class AsyncNonMetaRegionLocator {
if (tableCache == null) {
return 0;
}
return tableCache.cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
return tableCache.regionLocationCache.getAll().stream()
.mapToInt(RegionLocations::numNonNullElements).sum();
}
}

View File

@ -0,0 +1,316 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
/**
* Cache of RegionLocations for use by {@link AsyncNonMetaRegionLocator}. Wrapper around
* ConcurrentSkipListMap ensuring proper access to cached items. Updates are synchronized, but reads
* are not.
*/
final class AsyncRegionLocationCache {
private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocationCache.class);
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
private final TableName tableName;
public AsyncRegionLocationCache(TableName tableName) {
this.tableName = tableName;
}
/**
* Add the given locations to the cache, merging with existing if necessary. Also cleans out any
* previously cached locations which may have been superseded by this one (i.e. in case of merged
* regions). See {@link #cleanProblematicOverlappedRegions(RegionLocations)}
* @param locs the locations to cache
* @return the final location (possibly merged) that was added to the cache
*/
public synchronized RegionLocations add(RegionLocations locs) {
byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
RegionLocations oldLocs = cache.putIfAbsent(startKey, locs);
if (oldLocs == null) {
cleanProblematicOverlappedRegions(locs);
return locs;
}
// check whether the regions are the same, this usually happens when table is split/merged,
// or deleted and recreated again.
RegionInfo region = locs.getRegionLocation().getRegion();
RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
if (isEqual(mergedLocs, oldLocs)) {
// the merged one is the same with the old one, give up
LOG.trace("Will not add {} to cache because the old value {} "
+ " is newer than us or has the same server name."
+ " Maybe it is updated before we replace it", locs, oldLocs);
return oldLocs;
}
locs = mergedLocs;
} else {
// the region is different, here we trust the one we fetched. This maybe wrong but finally
// the upper layer can detect this and trigger removal of the wrong locations
if (LOG.isDebugEnabled()) {
LOG.debug("The newly fetch region {} is different from the old one {} for row '{}',"
+ " try replaying the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
}
}
cache.put(startKey, locs);
cleanProblematicOverlappedRegions(locs);
return locs;
}
/**
* When caching a location, the region may have been the result of a merge. Check to see if the
* region's boundaries overlap any other cached locations in a problematic way. Those would have
* been merge parents which no longer exist. We need to proactively clear them out to avoid a case
* where a merged region which receives no requests never gets cleared. This causes requests to
* other merged regions after it to see the wrong cached location.
* <p>
* For example, if we have Start_New < Start_Old < End_Old < End_New, then if we only access
* within range [End_Old, End_New], then it will always return the old region but it will then
* find out the row is not in the range, and try to get the new region, and then we get
* [Start_New, End_New), still fall into the same situation.
* <p>
* If Start_Old is less than Start_New, even if we have overlap, it is not a problem, as when the
* row is greater than Start_New, we will locate to the new region, and if the row is less than
* Start_New, it will fall into the old region's range and we will try to access the region and
* get a NotServing exception, and then we will clean the cache.
* <p>
* See HBASE-27650
* @param locations the new location that was just cached
*/
private void cleanProblematicOverlappedRegions(RegionLocations locations) {
RegionInfo region = locations.getRegionLocation().getRegion();
boolean isLast = isEmptyStopRow(region.getEndKey());
while (true) {
Map.Entry<byte[], RegionLocations> overlap =
isLast ? cache.lastEntry() : cache.lowerEntry(region.getEndKey());
if (
overlap == null || overlap.getValue() == locations
|| Bytes.equals(overlap.getKey(), region.getStartKey())
) {
break;
}
if (LOG.isInfoEnabled()) {
LOG.info(
"Removing cached location {} (endKey={}) because it overlaps with "
+ "new location {} (endKey={})",
overlap.getValue(),
Bytes.toStringBinary(overlap.getValue().getRegionLocation().getRegion().getEndKey()),
locations, Bytes.toStringBinary(locations.getRegionLocation().getRegion().getEndKey()));
}
cache.remove(overlap.getKey());
}
}
private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
HRegionLocation[] locArr1 = locs1.getRegionLocations();
HRegionLocation[] locArr2 = locs2.getRegionLocations();
if (locArr1.length != locArr2.length) {
return false;
}
for (int i = 0; i < locArr1.length; i++) {
// do not need to compare region info
HRegionLocation loc1 = locArr1[i];
HRegionLocation loc2 = locArr2[i];
if (loc1 == null) {
if (loc2 != null) {
return false;
}
} else {
if (loc2 == null) {
return false;
}
if (loc1.getSeqNum() != loc2.getSeqNum()) {
return false;
}
if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
return false;
}
}
}
return true;
}
/**
* Returns all cached RegionLocations
*/
public Collection<RegionLocations> getAll() {
return Collections.unmodifiableCollection(cache.values());
}
/**
* Gets the RegionLocations for a given region's startKey. This is a direct lookup, if the key
* does not exist in the cache it will return null.
* @param startKey region start key to directly look up
*/
public RegionLocations get(byte[] startKey) {
return cache.get(startKey);
}
/**
* Finds the RegionLocations for the region with the greatest startKey less than or equal to the
* given row
* @param row row to find locations
*/
public RegionLocations findForRow(byte[] row, int replicaId) {
Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
if (entry == null) {
return null;
}
RegionLocations locs = entry.getValue();
if (locs == null) {
return null;
}
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
return null;
}
byte[] endKey = loc.getRegion().getEndKey();
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
return locs;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Requested row {} comes after region end key of {} for cached location {}",
Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), locs);
}
return null;
}
}
/**
* Finds the RegionLocations for the region with the greatest startKey strictly less than the
* given row
* @param row row to find locations
*/
public RegionLocations findForBeforeRow(byte[] row, int replicaId) {
boolean isEmptyStopRow = isEmptyStopRow(row);
Map.Entry<byte[], RegionLocations> entry =
isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
if (entry == null) {
return null;
}
RegionLocations locs = entry.getValue();
if (locs == null) {
return null;
}
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
return null;
}
if (
isEmptyStopRow(loc.getRegion().getEndKey())
|| (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
return locs;
} else {
return null;
}
}
/**
* Removes the location from the cache if it exists and can be removed.
* @return true if entry was removed
*/
public synchronized boolean remove(HRegionLocation loc) {
byte[] startKey = loc.getRegion().getStartKey();
RegionLocations oldLocs = cache.get(startKey);
if (oldLocs == null) {
return false;
}
HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
if (!canUpdateOnError(loc, oldLoc)) {
return false;
}
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
if (newLocs == null) {
if (cache.remove(startKey, oldLocs)) {
return true;
}
} else {
cache.put(startKey, newLocs);
return true;
}
return false;
}
/**
* Returns the size of the region locations cache
*/
public int size() {
return cache.size();
}
/**
* Removes serverName from all locations in the cache, fully removing any RegionLocations which
* are empty after removing the server from it.
* @param serverName server to remove from locations
*/
public synchronized void removeForServer(ServerName serverName) {
for (Map.Entry<byte[], RegionLocations> entry : cache.entrySet()) {
byte[] regionName = entry.getKey();
RegionLocations locs = entry.getValue();
RegionLocations newLocs = locs.removeByServer(serverName);
if (locs == newLocs) {
continue;
}
if (newLocs.isEmpty()) {
cache.remove(regionName, locs);
} else {
cache.put(regionName, newLocs);
}
}
}
}

View File

@ -557,6 +557,10 @@ public final class MetricsConnection implements StatisticTrackable {
metaCacheMisses.inc();
}
public long getMetaCacheMisses() {
return metaCacheMisses.getCount();
}
/** Increment the number of meta cache drops requested for entire RegionServer. */
public void incrMetaCacheNumClearServer() {
metaCacheNumClearServer.inc();

View File

@ -76,7 +76,8 @@ public class TestAsyncTableLocatePrefetch {
// confirm that the locations of all the regions have been cached.
assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, Bytes.toBytes("aaa")));
for (byte[] row : HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE) {
assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, row));
assertNotNull("Expected location to not be null for " + Bytes.toStringBinary(row),
LOCATOR.getRegionLocationInCache(TABLE_NAME, row));
}
}
}

View File

@ -26,6 +26,9 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -50,6 +53,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.function.ThrowingRunnable;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
@ -112,6 +116,108 @@ public class TestMetaCache {
metrics = asyncConn.getConnectionMetrics().get();
}
/**
* Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally
* encountered when using floorEntry rather than lowerEntry.
*/
@Test
public void testAddToCacheReverse() throws IOException, InterruptedException {
setupConnection(1);
TableName tableName = TableName.valueOf("testAddToCache");
byte[] family = Bytes.toBytes("CF");
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
int maxSplits = 10;
List<byte[]> splits =
IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList());
TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][]));
TEST_UTIL.waitTableAvailable(tableName);
TEST_UTIL.waitUntilNoRegionsInTransition();
assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size());
RegionLocator locatorForTable = conn.getRegionLocator(tableName);
for (int i = maxSplits; i >= 0; i--) {
locatorForTable.getRegionLocation(Bytes.toBytes(i));
}
for (int i = 0; i < maxSplits; i++) {
assertNotNull(locator.getRegionLocationInCache(tableName, Bytes.toBytes(i)));
}
}
@Test
public void testMergeEmptyWithMetaCache() throws Throwable {
TableName tableName = TableName.valueOf("testMergeEmptyWithMetaCache");
byte[] family = Bytes.toBytes("CF");
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) });
TEST_UTIL.waitTableAvailable(tableName);
TEST_UTIL.waitUntilNoRegionsInTransition();
RegionInfo regionA = null;
RegionInfo regionB = null;
RegionInfo regionC = null;
for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) {
if (region.getStartKey().length == 0) {
regionA = region;
} else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) {
regionB = region;
} else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) {
regionC = region;
}
}
assertNotNull(regionA);
assertNotNull(regionB);
assertNotNull(regionC);
TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY,
true);
try (AsyncConnection asyncConn =
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn;
MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get();
// warm meta cache
asyncConn.getRegionLocator(tableName).getAllRegionLocations().get();
assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size());
// Merge the 3 regions into one
TEST_UTIL.getAdmin().mergeRegionsAsync(
new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() },
false).get(30, TimeUnit.SECONDS);
assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size());
AsyncTable<?> asyncTable = asyncConn.getTable(tableName);
// This request should cause us to cache the newly merged region.
// As part of caching that region, it should clear out any cached merge parent regions which
// are overlapped by the new region. That way, subsequent calls below won't fall into the
// bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached
// regionB and we'd continue to see cache misses below.
assertTrue(
executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics)
> 0);
// We verify no new cache misses here due to above, which proves we've fixed up the cache
assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(),
asyncMetrics));
}
}
private long executeAndGetNewMisses(ThrowingRunnable runnable, MetricsConnection metrics)
throws Throwable {
long lastVal = metrics.getMetaCacheMisses();
runnable.run();
long curVal = metrics.getMetaCacheMisses();
return curVal - lastVal;
}
@Test
public void testPreserveMetaCacheOnException() throws Exception {
((FakeRSRpcServices) badRS.getRSRpcServices())