diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 0009415142c..4c770dc914a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -23,15 +23,12 @@ import static org.apache.hadoop.hbase.HConstants.NINES; import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; import static org.apache.hadoop.hbase.HConstants.ZEROES; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; -import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; -import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; -import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import java.io.IOException; @@ -47,8 +44,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.ObjectUtils; import org.apache.hadoop.hbase.CatalogReplicaMode; @@ -66,8 +61,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Objects; - /** * The asynchronous locator for regions other than meta. */ @@ -146,13 +139,15 @@ class AsyncNonMetaRegionLocator { private static final class TableCache { - private final ConcurrentNavigableMap cache = - new ConcurrentSkipListMap<>(BYTES_COMPARATOR); - private final Set pendingRequests = new HashSet<>(); private final Map> allRequests = new LinkedHashMap<>(); + private final AsyncRegionLocationCache regionLocationCache; + + public TableCache(TableName tableName) { + regionLocationCache = new AsyncRegionLocationCache(tableName); + } public boolean hasQuota(int max) { return pendingRequests.size() < max; @@ -262,76 +257,7 @@ class AsyncNonMetaRegionLocator { } private TableCache getTableCache(TableName tableName) { - return computeIfAbsent(cache, tableName, TableCache::new); - } - - private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { - HRegionLocation[] locArr1 = locs1.getRegionLocations(); - HRegionLocation[] locArr2 = locs2.getRegionLocations(); - if (locArr1.length != locArr2.length) { - return false; - } - for (int i = 0; i < locArr1.length; i++) { - // do not need to compare region info - HRegionLocation loc1 = locArr1[i]; - HRegionLocation loc2 = locArr2[i]; - if (loc1 == null) { - if (loc2 != null) { - return false; - } - } else { - if (loc2 == null) { - return false; - } - if (loc1.getSeqNum() != loc2.getSeqNum()) { - return false; - } - if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) { - return false; - } - } - } - return true; - } - - // if we successfully add the locations to cache, return the locations, otherwise return the one - // which prevents us being added. The upper layer can use this value to complete pending requests. - private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) { - LOG.trace("Try adding {} to cache", locs); - byte[] startKey = locs.getRegionLocation().getRegion().getStartKey(); - for (;;) { - RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs); - if (oldLocs == null) { - return locs; - } - // check whether the regions are the same, this usually happens when table is split/merged, or - // deleted and recreated again. - RegionInfo region = locs.getRegionLocation().getRegion(); - RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion(); - if (region.getEncodedName().equals(oldRegion.getEncodedName())) { - RegionLocations mergedLocs = oldLocs.mergeLocations(locs); - if (isEqual(mergedLocs, oldLocs)) { - // the merged one is the same with the old one, give up - LOG.trace("Will not add {} to cache because the old value {} " - + " is newer than us or has the same server name." - + " Maybe it is updated before we replace it", locs, oldLocs); - return oldLocs; - } - if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { - return mergedLocs; - } - } else { - // the region is different, here we trust the one we fetched. This maybe wrong but finally - // the upper layer can detect this and trigger removal of the wrong locations - if (LOG.isDebugEnabled()) { - LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," - + " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey)); - } - if (tableCache.cache.replace(startKey, oldLocs, locs)) { - return locs; - } - } - } + return computeIfAbsent(cache, tableName, () -> new TableCache(tableName)); } private void complete(TableName tableName, LocateRequest req, RegionLocations locs, @@ -343,7 +269,7 @@ class AsyncNonMetaRegionLocator { Optional toSend = Optional.empty(); TableCache tableCache = getTableCache(tableName); if (locs != null) { - RegionLocations addedLocs = addToCache(tableCache, locs); + RegionLocations addedLocs = tableCache.regionLocationCache.add(locs); List futureResultList = new ArrayList<>(); synchronized (tableCache) { tableCache.pendingRequests.remove(req); @@ -421,62 +347,24 @@ class AsyncNonMetaRegionLocator { conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss); } - private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row, - int replicaId) { - Map.Entry entry = tableCache.cache.floorEntry(row); - if (entry == null) { + private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) { + RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId); + if (locs == null) { recordCacheMiss(); - return null; - } - RegionLocations locs = entry.getValue(); - HRegionLocation loc = locs.getRegionLocation(replicaId); - if (loc == null) { - recordCacheMiss(); - return null; - } - byte[] endKey = loc.getRegion().getEndKey(); - if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, - Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); - } - recordCacheHit(); - return locs; } else { - recordCacheMiss(); - return null; + recordCacheHit(); } + return locs; } - private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName, - byte[] row, int replicaId) { - boolean isEmptyStopRow = isEmptyStopRow(row); - Map.Entry entry = - isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); - if (entry == null) { + private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) { + RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId); + if (locs == null) { recordCacheMiss(); - return null; - } - RegionLocations locs = entry.getValue(); - HRegionLocation loc = locs.getRegionLocation(replicaId); - if (loc == null) { - recordCacheMiss(); - return null; - } - if ( - isEmptyStopRow(loc.getRegion().getEndKey()) - || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0) - ) { - if (LOG.isTraceEnabled()) { - LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, - Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); - } - recordCacheHit(); - return locs; } else { - recordCacheMiss(); - return null; + recordCacheHit(); } + return locs; } private void locateInMeta(TableName tableName, LocateRequest req) { @@ -570,7 +458,7 @@ class AsyncNonMetaRegionLocator { if (info == null || info.isOffline() || info.isSplitParent()) { continue; } - RegionLocations addedLocs = addToCache(tableCache, locs); + RegionLocations addedLocs = tableCache.regionLocationCache.add(locs); List futureResultList = new ArrayList<>(); synchronized (tableCache) { futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); @@ -582,11 +470,11 @@ class AsyncNonMetaRegionLocator { }); } - private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row, - int replicaId, RegionLocateType locateType) { + private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId, + RegionLocateType locateType) { return locateType.equals(RegionLocateType.BEFORE) - ? locateRowBeforeInCache(tableCache, tableName, row, replicaId) - : locateRowInCache(tableCache, tableName, row, replicaId); + ? locateRowBeforeInCache(tableCache, row, replicaId) + : locateRowInCache(tableCache, row, replicaId); } // locateToPrevious is true means we will use the start key of a region to locate the region @@ -598,7 +486,7 @@ class AsyncNonMetaRegionLocator { assert !locateType.equals(RegionLocateType.AFTER); TableCache tableCache = getTableCache(tableName); if (!reload) { - RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); + RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType); if (isGood(locs, replicaId)) { return CompletableFuture.completedFuture(locs); } @@ -609,7 +497,7 @@ class AsyncNonMetaRegionLocator { synchronized (tableCache) { // check again if (!reload) { - RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); + RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType); if (isGood(locs, replicaId)) { return CompletableFuture.completedFuture(locs); } @@ -648,43 +536,25 @@ class AsyncNonMetaRegionLocator { private void removeLocationFromCache(HRegionLocation loc) { TableCache tableCache = cache.get(loc.getRegion().getTable()); - if (tableCache == null) { - return; - } - byte[] startKey = loc.getRegion().getStartKey(); - for (;;) { - RegionLocations oldLocs = tableCache.cache.get(startKey); - if (oldLocs == null) { - return; - } - HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); - if (!canUpdateOnError(loc, oldLoc)) { - return; - } - // Tell metaReplicaSelector that the location is stale. It will create a stale entry - // with timestamp internally. Next time the client looks up the same location, - // it will pick a different meta replica region. - if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { - metaReplicaSelector.onError(loc); - } - - RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); - if (newLocs == null) { - if (tableCache.cache.remove(startKey, oldLocs)) { - recordClearRegionCache(); - return; - } - } else { - if (tableCache.cache.replace(startKey, oldLocs, newLocs)) { - recordClearRegionCache(); - return; - } + if (tableCache != null) { + if (tableCache.regionLocationCache.remove(loc)) { + recordClearRegionCache(); + updateMetaReplicaSelector(loc); } } } + private void updateMetaReplicaSelector(HRegionLocation loc) { + // Tell metaReplicaSelector that the location is stale. It will create a stale entry + // with timestamp internally. Next time the client looks up the same location, + // it will pick a different meta replica region. + if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { + metaReplicaSelector.onError(loc); + } + } + void addLocationToCache(HRegionLocation loc) { - addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc)); + getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc)); } private HRegionLocation getCachedLocation(HRegionLocation loc) { @@ -692,7 +562,7 @@ class AsyncNonMetaRegionLocator { if (tableCache == null) { return null; } - RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey()); + RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey()); return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; } @@ -717,8 +587,8 @@ class AsyncNonMetaRegionLocator { } } futureResultList.forEach(RegionLocationsFutureResult::complete); - conn.getConnectionMetrics() - .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size())); + conn.getConnectionMetrics().ifPresent( + metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size())); } void clearCache() { @@ -727,19 +597,7 @@ class AsyncNonMetaRegionLocator { void clearCache(ServerName serverName) { for (TableCache tableCache : cache.values()) { - for (Map.Entry entry : tableCache.cache.entrySet()) { - byte[] regionName = entry.getKey(); - RegionLocations locs = entry.getValue(); - RegionLocations newLocs = locs.removeByServer(serverName); - if (locs == newLocs) { - continue; - } - if (newLocs.isEmpty()) { - tableCache.cache.remove(regionName, locs); - } else { - tableCache.cache.replace(regionName, locs, newLocs); - } - } + tableCache.regionLocationCache.removeForServer(serverName); } } @@ -749,6 +607,7 @@ class AsyncNonMetaRegionLocator { if (tableCache == null) { return null; } - return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); + return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocationCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocationCache.java new file mode 100644 index 00000000000..ee5ccaf33d6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocationCache.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Objects; + +/** + * Cache of RegionLocations for use by {@link AsyncNonMetaRegionLocator}. Wrapper around + * ConcurrentSkipListMap ensuring proper access to cached items. Updates are synchronized, but reads + * are not. + */ +final class AsyncRegionLocationCache { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocationCache.class); + + private final ConcurrentNavigableMap cache = + new ConcurrentSkipListMap<>(BYTES_COMPARATOR); + private final TableName tableName; + + public AsyncRegionLocationCache(TableName tableName) { + this.tableName = tableName; + } + + /** + * Add the given locations to the cache, merging with existing if necessary. Also cleans out any + * previously cached locations which may have been superseded by this one (i.e. in case of merged + * regions). See {@link MetaCacheUtil} cleanProblematicOverlappedRegions + * @param locs the locations to cache + * @return the final location (possibly merged) that was added to the cache + */ + public synchronized RegionLocations add(RegionLocations locs) { + byte[] startKey = locs.getRegionLocation().getRegion().getStartKey(); + RegionLocations oldLocs = cache.putIfAbsent(startKey, locs); + if (oldLocs == null) { + MetaCacheUtil.cleanProblematicOverlappedRegions(locs, cache); + return locs; + } + + // check whether the regions are the same, this usually happens when table is split/merged, + // or deleted and recreated again. + RegionInfo region = locs.getRegionLocation().getRegion(); + RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion(); + if (region.getEncodedName().equals(oldRegion.getEncodedName())) { + RegionLocations mergedLocs = oldLocs.mergeLocations(locs); + if (isEqual(mergedLocs, oldLocs)) { + // the merged one is the same with the old one, give up + LOG.trace("Will not add {} to cache because the old value {} " + + " is newer than us or has the same server name." + + " Maybe it is updated before we replace it", locs, oldLocs); + return oldLocs; + } + locs = mergedLocs; + } else { + // the region is different, here we trust the one we fetched. This maybe wrong but finally + // the upper layer can detect this and trigger removal of the wrong locations + if (LOG.isDebugEnabled()) { + LOG.debug("The newly fetch region {} is different from the old one {} for row '{}'," + + " try replaying the old one...", region, oldRegion, Bytes.toStringBinary(startKey)); + } + } + + cache.put(startKey, locs); + MetaCacheUtil.cleanProblematicOverlappedRegions(locs, cache); + return locs; + } + + private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { + HRegionLocation[] locArr1 = locs1.getRegionLocations(); + HRegionLocation[] locArr2 = locs2.getRegionLocations(); + if (locArr1.length != locArr2.length) { + return false; + } + for (int i = 0; i < locArr1.length; i++) { + // do not need to compare region info + HRegionLocation loc1 = locArr1[i]; + HRegionLocation loc2 = locArr2[i]; + if (loc1 == null) { + if (loc2 != null) { + return false; + } + } else { + if (loc2 == null) { + return false; + } + if (loc1.getSeqNum() != loc2.getSeqNum()) { + return false; + } + if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) { + return false; + } + } + } + return true; + } + + /** + * Returns all cached RegionLocations + */ + public Collection getAll() { + return Collections.unmodifiableCollection(cache.values()); + } + + /** + * Gets the RegionLocations for a given region's startKey. This is a direct lookup, if the key + * does not exist in the cache it will return null. + * @param startKey region start key to directly look up + */ + public RegionLocations get(byte[] startKey) { + return cache.get(startKey); + } + + /** + * Finds the RegionLocations for the region with the greatest startKey less than or equal to the + * given row + * @param row row to find locations + */ + public RegionLocations findForRow(byte[] row, int replicaId) { + Map.Entry entry = cache.floorEntry(row); + if (entry == null) { + return null; + } + RegionLocations locs = entry.getValue(); + if (locs == null) { + return null; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + return null; + } + byte[] endKey = loc.getRegion().getEndKey(); + if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, + Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); + } + return locs; + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Requested row {} comes after region end key of {} for cached location {}", + Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), locs); + } + return null; + } + } + + /** + * Finds the RegionLocations for the region with the greatest startKey strictly less than the + * given row + * @param row row to find locations + */ + public RegionLocations findForBeforeRow(byte[] row, int replicaId) { + boolean isEmptyStopRow = isEmptyStopRow(row); + Map.Entry entry = + isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row); + if (entry == null) { + return null; + } + RegionLocations locs = entry.getValue(); + if (locs == null) { + return null; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + return null; + } + if ( + isEmptyStopRow(loc.getRegion().getEndKey()) + || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0) + ) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, + Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); + } + return locs; + } else { + return null; + } + } + + /** + * Removes the location from the cache if it exists and can be removed. + * @return true if entry was removed + */ + public synchronized boolean remove(HRegionLocation loc) { + byte[] startKey = loc.getRegion().getStartKey(); + RegionLocations oldLocs = cache.get(startKey); + if (oldLocs == null) { + return false; + } + + HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); + if (!canUpdateOnError(loc, oldLoc)) { + return false; + } + + RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); + if (newLocs == null) { + if (cache.remove(startKey, oldLocs)) { + return true; + } + } else { + cache.put(startKey, newLocs); + return true; + } + return false; + } + + /** + * Returns the size of the region locations cache + */ + public int size() { + return cache.size(); + } + + /** + * Removes serverName from all locations in the cache, fully removing any RegionLocations which + * are empty after removing the server from it. + * @param serverName server to remove from locations + */ + public synchronized void removeForServer(ServerName serverName) { + for (Map.Entry entry : cache.entrySet()) { + byte[] regionName = entry.getKey(); + RegionLocations locs = entry.getValue(); + RegionLocations newLocs = locs.removeByServer(serverName); + if (locs == newLocs) { + continue; + } + if (newLocs.isEmpty()) { + cache.remove(regionName, locs); + } else { + cache.put(regionName, newLocs); + } + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 4f9a1a3d951..5fec250793a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -111,12 +111,6 @@ public interface ClusterConnection extends Connection { */ void clearRegionCache(final TableName tableName); - /** - * Deletes cached locations for the specific region. - * @param location The location object for the region, to be purged from cache. - */ - void deleteCachedRegionLocation(final HRegionLocation location); - /** * Find the location of the region of tableName that row lives in, ignoring any * value that might be in the cache. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 4dc016ba7e9..97c29e86a9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -2037,11 +2037,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { cacheLocation(hri.getTable(), source, newHrl); } - @Override - public void deleteCachedRegionLocation(final HRegionLocation location) { - metaCache.clearCache(location); - } - /** * Update the location with the new value (if the exception is a RegionMovedException) or delete * it from the cache. Does nothing if we can be sure from the exception that the location is still diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index e0b3240b023..10e216741d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -45,7 +45,9 @@ public class MetaCache { private static final Logger LOG = LoggerFactory.getLogger(MetaCache.class); /** - * Map of table to table {@link HRegionLocation}s. + * Map of table to table {@link HRegionLocation}s.
+ * Despite being Concurrent, writes to the map should be synchronized because we have cases where + * we need to make multiple updates atomically. */ private final ConcurrentMap> cachedRegionLocations = @@ -101,22 +103,28 @@ public class MetaCache { return possibleRegion; } + if (LOG.isTraceEnabled()) { + LOG.trace("Requested row {} comes after region end key of {} for cached location {}", + Bytes.toStringBinary(row), Bytes.toStringBinary(endKey), possibleRegion); + } // Passed all the way through, so we got nothing - complete cache miss if (metrics != null) metrics.incrMetaCacheMiss(); return null; } /** - * Put a newly discovered HRegionLocation into the cache. + * Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to + * make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be + * atomic. * @param tableName The table name. * @param source the source of the new location * @param location the new location */ - public void cacheLocation(final TableName tableName, final ServerName source, + public synchronized void cacheLocation(final TableName tableName, final ServerName source, final HRegionLocation location) { assert source != null; byte[] startKey = location.getRegion().getStartKey(); - ConcurrentMap tableLocations = getTableLocations(tableName); + ConcurrentNavigableMap tableLocations = getTableLocations(tableName); RegionLocations locations = new RegionLocations(new HRegionLocation[] { location }); RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocations == null); @@ -125,6 +133,7 @@ public class MetaCache { LOG.trace("Cached location: " + location); } addToCachedServers(locations); + MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations); return; } @@ -141,8 +150,9 @@ public class MetaCache { // an additional counter on top of seqNum would be necessary to handle them all. RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force); if (oldLocations != updatedLocations) { - boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations); - if (replaced && LOG.isTraceEnabled()) { + tableLocations.put(startKey, updatedLocations); + MetaCacheUtil.cleanProblematicOverlappedRegions(updatedLocations, tableLocations); + if (LOG.isTraceEnabled()) { LOG.trace("Changed cached location to: " + location); } addToCachedServers(updatedLocations); @@ -150,13 +160,16 @@ public class MetaCache { } /** - * Put a newly discovered HRegionLocation into the cache. + * Put a newly discovered HRegionLocation into the cache. Synchronize here because we may need to + * make multiple modifications in cleanProblematicOverlappedRegions, and we want them to be + * atomic. * @param tableName The table name. * @param locations the new locations */ - public void cacheLocation(final TableName tableName, final RegionLocations locations) { + public synchronized void cacheLocation(final TableName tableName, + final RegionLocations locations) { byte[] startKey = locations.getRegionLocation().getRegion().getStartKey(); - ConcurrentMap tableLocations = getTableLocations(tableName); + ConcurrentNavigableMap tableLocations = getTableLocations(tableName); RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations); boolean isNewCacheEntry = (oldLocation == null); if (isNewCacheEntry) { @@ -164,6 +177,7 @@ public class MetaCache { LOG.trace("Cached location: " + locations); } addToCachedServers(locations); + MetaCacheUtil.cleanProblematicOverlappedRegions(locations, tableLocations); return; } @@ -171,8 +185,9 @@ public class MetaCache { // Meta record might be stale - some (probably the same) server has closed the region // with later seqNum and told us about the new location. RegionLocations mergedLocation = oldLocation.mergeLocations(locations); - boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation); - if (replaced && LOG.isTraceEnabled()) { + tableLocations.put(startKey, mergedLocation); + MetaCacheUtil.cleanProblematicOverlappedRegions(mergedLocation, tableLocations); + if (LOG.isTraceEnabled()) { LOG.trace("Merged cached locations: " + mergedLocation); } addToCachedServers(locations); @@ -186,7 +201,11 @@ public class MetaCache { } } - /** Returns Map of cached locations for passed tableName */ + /** + * Returns Map of cached locations for passed tableName.
+ * Despite being Concurrent, writes to the map should be synchronized because we have cases where + * we need to make multiple updates atomically. + */ private ConcurrentNavigableMap getTableLocations(final TableName tableName) { // find the map of cached locations for this table @@ -221,48 +240,48 @@ public class MetaCache { } /** - * Delete all cached entries. + * Delete all cached entries.
+ * Synchronized because of calls in cacheLocation which need to be executed atomically */ - public void clearCache() { + public synchronized void clearCache() { this.cachedRegionLocations.clear(); this.cachedServers.clear(); } /** - * Delete all cached entries of a server. + * Delete all cached entries of a server.
+ * Synchronized because of calls in cacheLocation which need to be executed atomically */ - public void clearCache(final ServerName serverName) { + public synchronized void clearCache(final ServerName serverName) { + // Prior to synchronizing this method, we used to do another check below while synchronizing + // on cachedServers. This is no longer necessary since we moved synchronization up. + // Prior reason: + // We block here, because if there is an error on a server, it's likely that multiple + // threads will get the error simultaneously. If there are hundreds of thousand of + // region location to check, it's better to do this only once. A better pattern would + // be to check if the server is dead when we get the region location. if (!this.cachedServers.contains(serverName)) { return; } boolean deletedSomething = false; - synchronized (this.cachedServers) { - // We block here, because if there is an error on a server, it's likely that multiple - // threads will get the error simultaneously. If there are hundreds of thousand of - // region location to check, it's better to do this only once. A better pattern would - // be to check if the server is dead when we get the region location. - if (!this.cachedServers.contains(serverName)) { - return; - } - for (ConcurrentMap tableLocations : cachedRegionLocations.values()) { - for (Entry e : tableLocations.entrySet()) { - RegionLocations regionLocations = e.getValue(); - if (regionLocations != null) { - RegionLocations updatedLocations = regionLocations.removeByServer(serverName); - if (updatedLocations != regionLocations) { - if (updatedLocations.isEmpty()) { - deletedSomething |= tableLocations.remove(e.getKey(), regionLocations); - } else { - deletedSomething |= - tableLocations.replace(e.getKey(), regionLocations, updatedLocations); - } + for (ConcurrentMap tableLocations : cachedRegionLocations.values()) { + for (Entry e : tableLocations.entrySet()) { + RegionLocations regionLocations = e.getValue(); + if (regionLocations != null) { + RegionLocations updatedLocations = regionLocations.removeByServer(serverName); + if (updatedLocations != regionLocations) { + deletedSomething = true; + if (updatedLocations.isEmpty()) { + tableLocations.remove(e.getKey()); + } else { + tableLocations.put(e.getKey(), updatedLocations); } } } } - this.cachedServers.remove(serverName); } + this.cachedServers.remove(serverName); if (deletedSomething) { if (metrics != null) { metrics.incrMetaCacheNumClearServer(); @@ -274,9 +293,31 @@ public class MetaCache { } /** - * Delete all cached entries of a table. + * Delete a cached location, no matter what it is. Called when we were told to not use cache.
+ * Synchronized because of calls in cacheLocation which need to be executed atomically + * @param tableName tableName */ - public void clearCache(final TableName tableName) { + public synchronized void clearCache(final TableName tableName, final byte[] row) { + ConcurrentMap tableLocations = getTableLocations(tableName); + + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey(); + tableLocations.remove(startKey); + if (metrics != null) { + metrics.incrMetaCacheNumClearRegion(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Removed " + regionLocations + " from cache"); + } + } + } + + /** + * Delete all cached entries of a table.
+ * Synchronized because of calls in cacheLocation which need to be executed atomically + */ + public synchronized void clearCache(final TableName tableName) { if (LOG.isTraceEnabled()) { LOG.trace("Removed all cached region locations for table " + tableName); } @@ -284,34 +325,13 @@ public class MetaCache { } /** - * Delete a cached location, no matter what it is. Called when we were told to not use cache. - * @param tableName tableName - */ - public void clearCache(final TableName tableName, final byte[] row) { - ConcurrentMap tableLocations = getTableLocations(tableName); - - RegionLocations regionLocations = getCachedLocation(tableName, row); - if (regionLocations != null) { - byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey(); - boolean removed = tableLocations.remove(startKey, regionLocations); - if (removed) { - if (metrics != null) { - metrics.incrMetaCacheNumClearRegion(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Removed " + regionLocations + " from cache"); - } - } - } - } - - /** - * Delete a cached location with specific replicaId. + * Delete a cached location with specific replicaId.
+ * Synchronized because of calls in cacheLocation which need to be executed atomically * @param tableName tableName * @param row row key * @param replicaId region replica id */ - public void clearCache(final TableName tableName, final byte[] row, int replicaId) { + public synchronized void clearCache(final TableName tableName, final byte[] row, int replicaId) { ConcurrentMap tableLocations = getTableLocations(tableName); RegionLocations regionLocations = getCachedLocation(tableName, row); @@ -320,29 +340,28 @@ public class MetaCache { if (toBeRemoved != null) { RegionLocations updatedLocations = regionLocations.remove(replicaId); byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey(); - boolean removed; if (updatedLocations.isEmpty()) { - removed = tableLocations.remove(startKey, regionLocations); + tableLocations.remove(startKey); } else { - removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + tableLocations.put(startKey, updatedLocations); } - if (removed) { - if (metrics != null) { - metrics.incrMetaCacheNumClearRegion(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Removed " + toBeRemoved + " from cache"); - } + if (metrics != null) { + metrics.incrMetaCacheNumClearRegion(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Removed " + toBeRemoved + " from cache"); } } } } /** - * Delete a cached location for a table, row and server + * Delete a cached location for a table, row and server.
+ * Synchronized because of calls in cacheLocation which need to be executed atomically */ - public void clearCache(final TableName tableName, final byte[] row, ServerName serverName) { + public synchronized void clearCache(final TableName tableName, final byte[] row, + ServerName serverName) { ConcurrentMap tableLocations = getTableLocations(tableName); RegionLocations regionLocations = getCachedLocation(tableName, row); @@ -350,81 +369,48 @@ public class MetaCache { RegionLocations updatedLocations = regionLocations.removeByServer(serverName); if (updatedLocations != regionLocations) { byte[] startKey = regionLocations.getRegionLocation().getRegion().getStartKey(); - boolean removed = false; if (updatedLocations.isEmpty()) { - removed = tableLocations.remove(startKey, regionLocations); + tableLocations.remove(startKey); } else { - removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + tableLocations.put(startKey, updatedLocations); } - if (removed) { - if (metrics != null) { - metrics.incrMetaCacheNumClearRegion(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row) - + " mapping to server: " + serverName + " from cache"); - } + if (metrics != null) { + metrics.incrMetaCacheNumClearRegion(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row) + + " mapping to server: " + serverName + " from cache"); } } } } /** - * Deletes the cached location of the region if necessary, based on some error from source. + * Deletes the cached location of the region if necessary, based on some error from source.
+ * Synchronized because of calls in cacheLocation which need to be executed atomically * @param hri The region in question. */ - public void clearCache(RegionInfo hri) { + public synchronized void clearCache(RegionInfo hri) { ConcurrentMap tableLocations = getTableLocations(hri.getTable()); RegionLocations regionLocations = tableLocations.get(hri.getStartKey()); if (regionLocations != null) { HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId()); if (oldLocation == null) return; RegionLocations updatedLocations = regionLocations.remove(oldLocation); - boolean removed; if (updatedLocations != regionLocations) { if (updatedLocations.isEmpty()) { - removed = tableLocations.remove(hri.getStartKey(), regionLocations); + tableLocations.remove(hri.getStartKey()); } else { - removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations); + tableLocations.put(hri.getStartKey(), updatedLocations); } - if (removed) { - if (metrics != null) { - metrics.incrMetaCacheNumClearRegion(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Removed " + oldLocation + " from cache"); - } + if (metrics != null) { + metrics.incrMetaCacheNumClearRegion(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Removed " + oldLocation + " from cache"); } } } } - public void clearCache(final HRegionLocation location) { - if (location == null) { - return; - } - TableName tableName = location.getRegion().getTable(); - ConcurrentMap tableLocations = getTableLocations(tableName); - RegionLocations regionLocations = tableLocations.get(location.getRegion().getStartKey()); - if (regionLocations != null) { - RegionLocations updatedLocations = regionLocations.remove(location); - boolean removed; - if (updatedLocations != regionLocations) { - if (updatedLocations.isEmpty()) { - removed = tableLocations.remove(location.getRegion().getStartKey(), regionLocations); - } else { - removed = tableLocations.replace(location.getRegion().getStartKey(), regionLocations, - updatedLocations); - } - if (removed) { - if (metrics != null) { - metrics.incrMetaCacheNumClearRegion(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Removed " + location + " from cache"); - } - } - } - } - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCacheUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCacheUtil.java new file mode 100644 index 00000000000..0cc5000ac97 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCacheUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; + +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Util class to DRY common logic between AsyncRegionLocationCache and MetaCache + */ +@InterfaceAudience.Private +final class MetaCacheUtil { + + private static final Logger LOG = LoggerFactory.getLogger(MetaCacheUtil.class); + + private MetaCacheUtil() { + } + + /** + * When caching a location, the region may have been the result of a merge. Check to see if the + * region's boundaries overlap any other cached locations in a problematic way. Those would have + * been merge parents which no longer exist. We need to proactively clear them out to avoid a case + * where a merged region which receives no requests never gets cleared. This causes requests to + * other merged regions after it to see the wrong cached location. + *

+ * For example, if we have Start_New < Start_Old < End_Old < End_New, then if we only access + * within range [End_Old, End_New], then it will always return the old region but it will then + * find out the row is not in the range, and try to get the new region, and then we get + * [Start_New, End_New), still fall into the same situation. + *

+ * If Start_Old is less than Start_New, even if we have overlap, it is not a problem, as when the + * row is greater than Start_New, we will locate to the new region, and if the row is less than + * Start_New, it will fall into the old region's range and we will try to access the region and + * get a NotServing exception, and then we will clean the cache. + *

+ * See HBASE-27650 + * @param locations the new location that was just cached + */ + static void cleanProblematicOverlappedRegions(RegionLocations locations, + ConcurrentNavigableMap cache) { + RegionInfo region = locations.getRegionLocation().getRegion(); + + boolean isLast = isEmptyStopRow(region.getEndKey()); + + while (true) { + Map.Entry overlap = + isLast ? cache.lastEntry() : cache.lowerEntry(region.getEndKey()); + if ( + overlap == null || overlap.getValue() == locations + || Bytes.equals(overlap.getKey(), region.getStartKey()) + ) { + break; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Removing cached location {} (endKey={}) because it overlaps with " + + "new location {} (endKey={})", + overlap.getValue(), + Bytes.toStringBinary(overlap.getValue().getRegionLocation().getRegion().getEndKey()), + locations, Bytes.toStringBinary(locations.getRegionLocation().getRegion().getEndKey())); + } + + cache.remove(overlap.getKey()); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 0f9b5a5ee86..923114d61ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -558,6 +558,10 @@ public final class MetricsConnection implements StatisticTrackable { metaCacheMisses.inc(); } + public long getMetaCacheMisses() { + return metaCacheMisses.getCount(); + } + /** Increment the number of meta cache drops requested for entire RegionServer. */ public void incrMetaCacheNumClearServer() { metaCacheNumClearServer.inc(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java index e69469ae3cd..816a8f45780 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java @@ -76,7 +76,8 @@ public class TestAsyncTableLocatePrefetch { // confirm that the locations of all the regions have been cached. assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, Bytes.toBytes("aaa"))); for (byte[] row : HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE) { - assertNotNull(LOCATOR.getRegionLocationInCache(TABLE_NAME, row)); + assertNotNull("Expected location to not be null for " + Bytes.toStringBinary(row), + LOCATOR.getRegionLocationInCache(TABLE_NAME, row)); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java index 87b704cbe9a..b7f520655c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java @@ -26,6 +26,10 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -47,10 +51,12 @@ import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +108,128 @@ public class TestMetaCache { TEST_UTIL.shutdownMiniCluster(); } + @Test + public void testMergeEmptyWithMetaCache() throws Throwable { + TableName tableName = TableName.valueOf("MergeEmpty"); + byte[] family = Bytes.toBytes("CF"); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + TEST_UTIL.getAdmin().createTable(td, new byte[][] { Bytes.toBytes(2), Bytes.toBytes(5) }); + TEST_UTIL.waitTableAvailable(tableName); + TEST_UTIL.waitUntilNoRegionsInTransition(); + RegionInfo regionA = null; + RegionInfo regionB = null; + RegionInfo regionC = null; + for (RegionInfo region : TEST_UTIL.getAdmin().getRegions(tableName)) { + if (region.getStartKey().length == 0) { + regionA = region; + } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(2))) { + regionB = region; + } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(5))) { + regionC = region; + } + } + + assertNotNull(regionA); + assertNotNull(regionB); + assertNotNull(regionC); + + TEST_UTIL.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, + true); + try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + AsyncConnection asyncConn = + ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { + ConnectionImplementation connImpl = (ConnectionImplementation) conn; + AsyncConnectionImpl asyncConnImpl = (AsyncConnectionImpl) asyncConn; + + MetricsConnection metrics = connImpl.getConnectionMetrics(); + MetricsConnection asyncMetrics = asyncConnImpl.getConnectionMetrics().get(); + + // warm meta cache + conn.getRegionLocator(tableName).getAllRegionLocations(); + asyncConn.getRegionLocator(tableName).getAllRegionLocations().get(); + + Assert.assertEquals(3, TEST_UTIL.getAdmin().getRegions(tableName).size()); + + // Merge the 3 regions into one + TEST_UTIL.getAdmin().mergeRegionsAsync( + new byte[][] { regionA.getRegionName(), regionB.getRegionName(), regionC.getRegionName() }, + false).get(30, TimeUnit.SECONDS); + + Assert.assertEquals(1, TEST_UTIL.getAdmin().getRegions(tableName).size()); + + Table table = conn.getTable(tableName); + AsyncTable asyncTable = asyncConn.getTable(tableName); + + // This request should cause us to cache the newly merged region. + // As part of caching that region, it should clear out any cached merge parent regions which + // are overlapped by the new region. That way, subsequent calls below won't fall into the + // bug in HBASE-27650. Otherwise, a request for row 6 would always get stuck on cached + // regionB and we'd continue to see cache misses below. + assertTrue(executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics) > 0); + assertTrue( + executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), asyncMetrics) + > 0); + + // We verify no new cache misses here due to above, which proves we've fixed up the cache + assertEquals(0, executeAndGetNewMisses(() -> table.get(new Get(Bytes.toBytes(6))), metrics)); + assertEquals(0, executeAndGetNewMisses(() -> asyncTable.get(new Get(Bytes.toBytes(6))).get(), + asyncMetrics)); + } + } + + private long executeAndGetNewMisses(ThrowingRunnable runnable, MetricsConnection metrics) + throws Throwable { + long lastVal = metrics.getMetaCacheMisses(); + runnable.run(); + long curVal = metrics.getMetaCacheMisses(); + return curVal - lastVal; + } + + /** + * Test that our cleanOverlappingRegions doesn't incorrectly remove regions from cache. Originally + * encountered when using floorEntry rather than lowerEntry. + */ + @Test + public void testAddToCacheReverse() throws IOException, InterruptedException, ExecutionException { + try ( + AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) ConnectionFactory + .createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + ConnectionImplementation conn = (ConnectionImplementation) ConnectionFactory + .createConnection(TEST_UTIL.getConfiguration())) { + + AsyncNonMetaRegionLocator asyncLocator = asyncConn.getLocator().getNonMetaRegionLocator(); + + TableName tableName = TableName.valueOf("testAddToCache"); + byte[] family = Bytes.toBytes("CF"); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + int maxSplits = 10; + List splits = + IntStream.range(1, maxSplits).mapToObj(Bytes::toBytes).collect(Collectors.toList()); + + TEST_UTIL.getAdmin().createTable(td, splits.toArray(new byte[0][])); + TEST_UTIL.waitTableAvailable(tableName); + TEST_UTIL.waitUntilNoRegionsInTransition(); + conn.getRegionLocator(tableName); + + assertEquals(splits.size() + 1, TEST_UTIL.getAdmin().getRegions(tableName).size()); + + RegionLocator locatorForTable = conn.getRegionLocator(tableName); + AsyncTableRegionLocator asyncLocatorForTable = asyncConn.getRegionLocator(tableName); + for (int i = maxSplits; i >= 0; i--) { + locatorForTable.getRegionLocation(Bytes.toBytes(i)); + asyncLocatorForTable.getRegionLocation(Bytes.toBytes(i)); + } + + for (int i = 0; i < maxSplits; i++) { + assertNotNull(asyncLocator.getRegionLocationInCache(tableName, Bytes.toBytes(i))); + assertNotNull(conn.getCachedLocation(tableName, Bytes.toBytes(i))); + } + } + + } + @Test public void testPreserveMetaCacheOnException() throws Exception { ((FakeRSRpcServices) badRS.getRSRpcServices())