diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 121a16bbaef..6cad6a27580 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -66,7 +66,7 @@ class AsyncConnectionImpl implements AsyncConnection { private final User user; - private final AsyncRegistry registry; + final AsyncRegistry registry; private final String clusterId; @@ -87,15 +87,11 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); @SuppressWarnings("deprecation") - public AsyncConnectionImpl(Configuration conf, User user) throws IOException { + public AsyncConnectionImpl(Configuration conf, User user) { this.conf = conf; this.user = user; - this.connConf = new AsyncConnectionConfiguration(conf); - - this.locator = new AsyncRegionLocator(conf); - - // action below will not throw exception so no need to catch and close. + this.locator = new AsyncRegionLocator(this); this.registry = ClusterRegistryFactory.getRegistry(conf); this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { if (LOG.isDebugEnabled()) { @@ -122,7 +118,6 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public void close() { - IOUtils.closeQuietly(locator); IOUtils.closeQuietly(rpcClient); IOUtils.closeQuietly(registry); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 321fd7166aa..ba5a0e079c3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -17,71 +17,437 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY; +import static org.apache.hadoop.hbase.HConstants.NINES; +import static org.apache.hadoop.hbase.HConstants.ZEROES; +import static org.apache.hadoop.hbase.HRegionInfo.createRegionName; +import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException; +import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; -import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.util.Bytes; /** - * TODO: reimplement using aync connection when the scan logic is ready. The current implementation - * is based on the blocking client. + * The asynchronous region locator. */ @InterfaceAudience.Private -class AsyncRegionLocator implements Closeable { +class AsyncRegionLocator { - private final ConnectionImplementation conn; + private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class); - AsyncRegionLocator(Configuration conf) throws IOException { - conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf); + private final AsyncConnectionImpl conn; + + private final AtomicReference metaRegionLocation = new AtomicReference<>(); + + private final AtomicReference> metaRelocateFuture = + new AtomicReference<>(); + + private final ConcurrentMap> cache = + new ConcurrentHashMap<>(); + + AsyncRegionLocator(AsyncConnectionImpl conn) { + this.conn = conn; } - CompletableFuture getRegionLocation(TableName tableName, byte[] row, - boolean reload) { - CompletableFuture future = new CompletableFuture<>(); - try { - future.complete(conn.getRegionLocation(tableName, row, reload)); - } catch (IOException e) { - future.completeExceptionally(e); - } - return future; - } - - CompletableFuture getPreviousRegionLocation(TableName tableName, - byte[] startRowOfCurrentRegion, boolean reload) { - CompletableFuture future = new CompletableFuture<>(); - byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion); - try { - for (;;) { - HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload); - byte[] endKey = loc.getRegionInfo().getEndKey(); - if (Bytes.equals(startRowOfCurrentRegion, endKey)) { - future.complete(loc); - break; - } - toLocateRow = endKey; + private CompletableFuture locateMetaRegion() { + for (;;) { + HRegionLocation metaRegionLocation = this.metaRegionLocation.get(); + if (metaRegionLocation != null) { + return CompletableFuture.completedFuture(metaRegionLocation); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Meta region location cache is null, try fetching from registry."); + } + if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Start fetching meta region location from registry."); + } + CompletableFuture future = metaRelocateFuture.get(); + conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> { + if (error != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to fetch meta region location from registry", error); + } + metaRelocateFuture.getAndSet(null).completeExceptionally(error); + return; + } + HRegionLocation loc = locs.getDefaultRegionLocation(); + if (LOG.isDebugEnabled()) { + LOG.debug("The fetched meta region location is " + loc); + } + // Here we update cache before reset future, so it is possible that someone can get a + // stale value. Consider this: + // 1. update cache + // 2. someone clear the cache and relocate again + // 3. the metaRelocateFuture is not null so the old future is used. + // 4. we clear metaRelocateFuture and complete the future in it with the value being + // cleared in step 2. + // But we do not think it is a big deal as it rarely happens, and even if it happens, the + // caller will retry again later, no correctness problems. + this.metaRegionLocation.set(loc); + metaRelocateFuture.set(null); + future.complete(loc); + }); + } else { + CompletableFuture future = metaRelocateFuture.get(); + if (future != null) { + return future; + } } - } catch (IOException e) { - future.completeExceptionally(e); } + } + + private static ConcurrentNavigableMap createTableCache() { + return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + } + + private void removeFromCache(HRegionLocation loc) { + ConcurrentNavigableMap tableCache = + cache.get(loc.getRegionInfo().getTable()); + if (tableCache == null) { + return; + } + tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> { + if (oldLoc.getSeqNum() > loc.getSeqNum() + || !oldLoc.getServerName().equals(loc.getServerName())) { + return oldLoc; + } + return null; + }); + } + + private void addToCache(HRegionLocation loc) { + if (LOG.isTraceEnabled()) { + LOG.trace("Try adding " + loc + " to cache"); + } + ConcurrentNavigableMap tableCache = computeIfAbsent(cache, + loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache); + byte[] startKey = loc.getRegionInfo().getStartKey(); + HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc); + if (oldLoc == null) { + return; + } + if (oldLoc.getSeqNum() > loc.getSeqNum() + || oldLoc.getServerName().equals(loc.getServerName())) { + if (LOG.isTraceEnabled()) { + LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + + " is newer than us or has the same server name"); + } + return; + } + tableCache.compute(startKey, (k, oldValue) -> { + if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) { + return loc; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + + " is newer than us or has the same server name." + + " Maybe it is updated before we replace it"); + } + return oldValue; + }); + } + + private HRegionLocation locateInCache(TableName tableName, byte[] row) { + ConcurrentNavigableMap tableCache = cache.get(tableName); + if (tableCache == null) { + return null; + } + Map.Entry entry = tableCache.floorEntry(row); + if (entry == null) { + return null; + } + HRegionLocation loc = entry.getValue(); + byte[] endKey = loc.getRegionInfo().getEndKey(); + if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { + return loc; + } else { + return null; + } + } + + private void onScanComplete(CompletableFuture future, TableName tableName, + byte[] row, List results, Throwable error, String rowNameInErrorMsg, + Consumer otherCheck) { + if (error != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='" + + Bytes.toStringBinary(row) + "'", + error); + } + future.completeExceptionally(error); + return; + } + if (results.isEmpty()) { + future.completeExceptionally(new TableNotFoundException(tableName)); + return; + } + RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); + if (LOG.isDebugEnabled()) { + LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='" + + Bytes.toStringBinary(row) + "' is " + locs); + } + if (locs == null || locs.getDefaultRegionLocation() == null) { + future.completeExceptionally( + new IOException(String.format("No location found for '%s', %s='%s'", tableName, + rowNameInErrorMsg, Bytes.toStringBinary(row)))); + return; + } + HRegionLocation loc = locs.getDefaultRegionLocation(); + HRegionInfo info = loc.getRegionInfo(); + if (info == null) { + future.completeExceptionally( + new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName, + rowNameInErrorMsg, Bytes.toStringBinary(row)))); + return; + } + if (!info.getTable().equals(tableName)) { + future.completeExceptionally(new TableNotFoundException( + "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'")); + return; + } + if (info.isSplit()) { + future.completeExceptionally(new RegionOfflineException( + "the only available region for the required row is a split parent," + + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); + return; + } + if (info.isOffline()) { + future.completeExceptionally(new RegionOfflineException("the region is offline, could" + + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); + return; + } + if (loc.getServerName() == null) { + future.completeExceptionally(new NoServerForRegionException( + String.format("No server address listed for region '%s', %s='%s'", + info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row)))); + return; + } + otherCheck.accept(loc); + if (future.isDone()) { + return; + } + addToCache(loc); + future.complete(loc); + } + + private CompletableFuture locateInMeta(TableName tableName, byte[] row) { + if (LOG.isTraceEnabled()) { + LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta"); + } + CompletableFuture future = new CompletableFuture<>(); + byte[] metaKey = createRegionName(tableName, row, NINES, false); + conn.getTable(META_TABLE_NAME) + .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) + .whenComplete( + (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> { + })); return future; } - void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception, - ServerName source) { - conn.updateCachedLocations(tableName, regionName, row, exception, source); + private CompletableFuture locateRegion(TableName tableName, byte[] row) { + HRegionLocation loc = locateInCache(tableName, row); + if (loc != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + + Bytes.toStringBinary(row) + "'"); + } + return CompletableFuture.completedFuture(loc); + } + return locateInMeta(tableName, row); } - @Override - public void close() { - IOUtils.closeQuietly(conn); + CompletableFuture getRegionLocation(TableName tableName, byte[] row) { + if (tableName.equals(META_TABLE_NAME)) { + return locateMetaRegion(); + } else { + return locateRegion(tableName, row); + } + } + + private HRegionLocation locatePreviousInCache(TableName tableName, + byte[] startRowOfCurrentRegion) { + ConcurrentNavigableMap tableCache = cache.get(tableName); + if (tableCache == null) { + return null; + } + Map.Entry entry; + if (isEmptyStopRow(startRowOfCurrentRegion)) { + entry = tableCache.lastEntry(); + } else { + entry = tableCache.lowerEntry(startRowOfCurrentRegion); + } + if (entry == null) { + return null; + } + HRegionLocation loc = entry.getValue(); + if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) { + return loc; + } else { + return null; + } + } + + private CompletableFuture locatePreviousInMeta(TableName tableName, + byte[] startRowOfCurrentRegion) { + if (LOG.isTraceEnabled()) { + LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='" + + Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta"); + } + byte[] metaKey; + if (isEmptyStopRow(startRowOfCurrentRegion)) { + byte[] binaryTableName = tableName.getName(); + metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); + } else { + metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false); + } + CompletableFuture future = new CompletableFuture<>(); + conn.getTable(META_TABLE_NAME) + .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) + .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion, + results, error, "startRowOfCurrentRegion", loc -> { + HRegionInfo info = loc.getRegionInfo(); + if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) { + future.completeExceptionally(new IOException("The end key of '" + + info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey()) + + "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'")); + } + })); + return future; + } + + private CompletableFuture locatePreviousRegion(TableName tableName, + byte[] startRowOfCurrentRegion) { + HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion); + if (loc != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='" + + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"); + } + return CompletableFuture.completedFuture(loc); + } + return locatePreviousInMeta(tableName, startRowOfCurrentRegion); + } + + /** + * Locate the previous region using the current regions start key. Used for reverse scan. + */ + CompletableFuture getPreviousRegionLocation(TableName tableName, + byte[] startRowOfCurrentRegion) { + if (tableName.equals(META_TABLE_NAME)) { + return locateMetaRegion(); + } else { + return locatePreviousRegion(tableName, startRowOfCurrentRegion); + } + } + + private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) { + // Do not need to update if no such location, or the location is newer, or the location is not + // same with us + return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() + && oldLoc.getServerName().equals(loc.getServerName()); + } + + private void updateCachedLoation(HRegionLocation loc, Throwable exception, + Function cachedLocationSupplier, + Consumer addToCache, Consumer removeFromCache) { + HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); + if (LOG.isDebugEnabled()) { + LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception); + } + if (!canUpdate(loc, oldLoc)) { + return; + } + Throwable cause = findException(exception); + if (LOG.isDebugEnabled()) { + LOG.debug("The actual exception when updating " + loc, cause); + } + if (cause == null || !isMetaClearingException(cause)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Will not update " + loc + " because the exception is null or not the one we care about"); + } + return; + } + if (cause instanceof RegionMovedException) { + RegionMovedException rme = (RegionMovedException) cause; + HRegionLocation newLoc = + new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme); + } + addToCache.accept(newLoc); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Try removing " + loc + " from cache"); + } + removeFromCache.accept(loc); + } + } + + void updateCachedLocation(HRegionLocation loc, Throwable exception) { + if (loc.getRegionInfo().isMetaTable()) { + updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> { + for (;;) { + HRegionLocation oldLoc = metaRegionLocation.get(); + if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() + || oldLoc.getServerName().equals(newLoc.getServerName()))) { + return; + } + if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) { + return; + } + } + }, l -> { + for (;;) { + HRegionLocation oldLoc = metaRegionLocation.get(); + if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) { + return; + } + } + }); + } else { + updateCachedLoation(loc, exception, l -> { + ConcurrentNavigableMap tableCache = + cache.get(l.getRegionInfo().getTable()); + if (tableCache == null) { + return null; + } + return tableCache.get(l.getRegionInfo().getStartKey()); + }, this::addToCache, this::removeFromCache); + } + } + + void clearCache(TableName tableName) { + if (LOG.isDebugEnabled()) { + LOG.debug("Clear meta cache for " + tableName); + } + cache.remove(tableName); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 9020ce50ceb..0d23c390166 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -68,8 +68,8 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SingleRequestCallerBuilder - action(AsyncSingleRequestRpcRetryingCaller.Callable callable) { + public SingleRequestCallerBuilder action( + AsyncSingleRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -92,12 +92,9 @@ class AsyncRpcRetryingCallerFactory { public AsyncSingleRequestRpcRetryingCaller build() { return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), - locateToPreviousRegion - ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re) - : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re), - checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(), - conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs, - conn.connConf.getStartLogErrorsCnt()); + locateToPreviousRegion, checkNotNull(callable, "action is null"), + conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, + rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 1d0357dffbf..f10c9a52004 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -60,12 +60,6 @@ class AsyncSingleRequestRpcRetryingCaller { ClientService.Interface stub); } - @FunctionalInterface - public interface RegionLocator { - CompletableFuture locate(AsyncConnectionImpl conn, TableName tableName, - byte[] row, boolean reload); - } - private final HashedWheelTimer retryTimer; private final AsyncConnectionImpl conn; @@ -74,7 +68,7 @@ class AsyncSingleRequestRpcRetryingCaller { private final byte[] row; - private final RegionLocator locator; + private final Supplier> locate; private final Callable callable; @@ -97,13 +91,18 @@ class AsyncSingleRequestRpcRetryingCaller { private final long startNs; public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, byte[] row, RegionLocator locator, Callable callable, long pauseNs, - int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, byte[] row, boolean locateToPreviousRegion, Callable callable, + long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.row = row; - this.locator = locator; + if (locateToPreviousRegion) { + this.locate = this::locatePrevious; + } else { + this.locate = this::locate; + } this.callable = callable; this.pauseNs = pauseNs; this.maxAttempts = retries2Attempts(maxRetries); @@ -145,8 +144,9 @@ class AsyncSingleRequestRpcRetryingCaller { if (tries > startLogErrorsCnt) { LOG.warn(errMsg.get(), error); } - RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext( - error, EnvironmentEdgeManager.currentTime(), ""); + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(error, + EnvironmentEdgeManager.currentTime(), ""); exceptions.add(qt); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { completeExceptionally(); @@ -194,8 +194,7 @@ class AsyncSingleRequestRpcRetryingCaller { + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + elapsedMs() + " ms", - err -> conn.getLocator().updateCachedLocations(tableName, - loc.getRegionInfo().getRegionName(), row, err, loc.getServerName())); + err -> conn.getLocator().updateCachedLocation(loc, err)); return; } resetController(); @@ -207,8 +206,7 @@ class AsyncSingleRequestRpcRetryingCaller { + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + elapsedMs() + " ms", - err -> conn.getLocator().updateCachedLocations(tableName, - loc.getRegionInfo().getRegionName(), row, err, loc.getServerName())); + err -> conn.getLocator().updateCachedLocation(loc, err)); return; } future.complete(result); @@ -216,7 +214,7 @@ class AsyncSingleRequestRpcRetryingCaller { } private void locateThenCall() { - locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> { + locate.get().whenComplete((loc, error) -> { if (error != null) { onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " @@ -231,6 +229,14 @@ class AsyncSingleRequestRpcRetryingCaller { }); } + private CompletableFuture locate() { + return conn.getLocator().getRegionLocation(tableName, row); + } + + private CompletableFuture locatePrevious() { + return conn.getLocator().getPreviousRegionLocation(tableName, row); + } + public CompletableFuture call() { locateThenCall(); return future; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index d715e241ee4..b29f878e6d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { @Override public CompletableFuture getRegionLocation(byte[] row, boolean reload) { - return locator.getRegionLocation(tableName, row, reload); + return locator.getRegionLocation(tableName, row); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index 775f8bd714f..4e19b7729f9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -107,6 +108,19 @@ public class CollectionUtils { return list.get(list.size() - 1); } + /** + * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the + * value already exists. So here we copy the implementation of + * {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and + * putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee + * that the supplier will only be executed once. + */ + public static V computeIfAbsent(ConcurrentMap map, K key, Supplier supplier) { + V v, newValue; + return ((v = map.get(key)) == null && (newValue = supplier.get()) != null + && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v; + } + /** * A supplier that throws IOException when get. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java new file mode 100644 index 00000000000..b20e616b491 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Will split the table, and move region randomly when testing. + */ +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncGetMultiThread { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static int COUNT = 1000; + + private static AsyncConnection CONN; + + private static byte[][] SPLIT_KEYS; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); + TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); + TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); + TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); + TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.startMiniCluster(5); + SPLIT_KEYS = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + AsyncTable table = CONN.getTable(TABLE_NAME); + List> futures = new ArrayList<>(); + IntStream.range(0, COUNT) + .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { + while (!stop.get()) { + int i = ThreadLocalRandom.current().nextInt(COUNT); + assertEquals(i, + Bytes.toInt(CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))) + .get().getValue(FAMILY, QUALIFIER))); + } + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + int numThreads = 20; + AtomicBoolean stop = new AtomicBoolean(false); + ExecutorService executor = + Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-")); + List> futures = new ArrayList<>(); + IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { + run(stop); + return null; + }))); + Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123)); + Admin admin = TEST_UTIL.getAdmin(); + for (byte[] splitPoint : SPLIT_KEYS) { + admin.split(TABLE_NAME, splitPoint); + for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) { + region.compact(true); + } + Thread.sleep(5000); + admin.balancer(true); + Thread.sleep(5000); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer)) + .findAny().get(); + admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), + Bytes.toBytes(newMetaServer.getServerName())); + Thread.sleep(5000); + } + stop.set(true); + executor.shutdown(); + for (Future future : futures) { + future.get(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java new file mode 100644 index 00000000000..2e46d8a19c8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncRegionLocator { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static AsyncConnectionImpl CONN; + + private static AsyncRegionLocator LOCATOR; + + private static byte[][] SPLIT_KEYS; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); + CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); + LOCATOR = CONN.getLocator(); + SPLIT_KEYS = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDownAfterTest() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + if (admin.tableExists(TABLE_NAME)) { + if (admin.isTableEnabled(TABLE_NAME)) { + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + } + TEST_UTIL.getAdmin().deleteTable(TABLE_NAME); + } + LOCATOR.clearCache(TABLE_NAME); + } + + private void createSingleRegionTable() throws IOException, InterruptedException { + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + } + + @Test + public void testNoTable() throws InterruptedException { + try { + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); + } + try { + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); + } + } + + @Test + public void testDisableTable() throws IOException, InterruptedException { + createSingleRegionTable(); + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + try { + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); + } + try { + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); + } + } + + private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName, + HRegionLocation loc) { + HRegionInfo info = loc.getRegionInfo(); + assertEquals(TABLE_NAME, info.getTable()); + assertArrayEquals(startKey, info.getStartKey()); + assertArrayEquals(endKey, info.getEndKey()); + assertEquals(serverName, loc.getServerName()); + } + + @Test + public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException { + createSingleRegionTable(); + ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)]; + ThreadLocalRandom.current().nextBytes(randKey); + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, + LOCATOR.getRegionLocation(TABLE_NAME, randKey).get()); + // Use a key which is not the endKey of a region will cause error + try { + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, + LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get()); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(IOException.class)); + assertTrue(e.getCause().getMessage().contains("end key of")); + } + } + + private void createMultiRegionTable() throws IOException, InterruptedException { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + } + + private static byte[][] getStartKeys() { + byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][]; + startKeys[0] = EMPTY_START_ROW; + System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length); + return startKeys; + } + + private static byte[][] getEndKeys() { + byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1); + endKeys[endKeys.length - 1] = EMPTY_START_ROW; + return endKeys; + } + + @Test + public void testMultiRegionTable() throws IOException, InterruptedException { + createMultiRegionTable(); + byte[][] startKeys = getStartKeys(); + ServerName[] serverNames = new ServerName[startKeys.length]; + TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .forEach(rs -> { + rs.getOnlineRegions(TABLE_NAME).forEach(r -> { + serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(), + Bytes::compareTo)] = rs.getServerName(); + }); + }); + IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { + try { + assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], + serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + })); + LOCATOR.clearCache(TABLE_NAME); + byte[][] endKeys = getEndKeys(); + IntStream.range(0, 2).forEach( + n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> { + try { + assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i], + LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + })); + } + + @Test + public void testRegionMove() throws IOException, InterruptedException, ExecutionException { + createSingleRegionTable(); + ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); + HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc); + ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) + .findAny().get(); + + TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()), + Bytes.toBytes(newServerName.getServerName())); + while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName() + .equals(newServerName)) { + Thread.sleep(100); + } + // Should be same as it is in cache + assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + LOCATOR.updateCachedLocation(loc, null); + // null error will not trigger a cache cleanup + assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + LOCATOR.updateCachedLocation(loc, new NotServingRegionException()); + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index fd3938ed66c..67d2661502b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -151,11 +150,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller { AtomicBoolean errorTriggered = new AtomicBoolean(false); AtomicInteger count = new AtomicInteger(0); HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); - - try (AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn.getConfiguration()) { + AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn) { @Override - CompletableFuture getRegionLocation(TableName tableName, byte[] row, - boolean reload) { + CompletableFuture getRegionLocation(TableName tableName, byte[] row) { if (tableName.equals(TABLE_NAME)) { CompletableFuture future = new CompletableFuture<>(); if (count.getAndIncrement() == 0) { @@ -166,17 +163,22 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } return future; } else { - return super.getRegionLocation(tableName, row, reload); + return super.getRegionLocation(tableName, row); } } @Override - void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, - Object exception, ServerName source) { + CompletableFuture getPreviousRegionLocation(TableName tableName, + byte[] startRowOfCurrentRegion) { + return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion); + } + + @Override + void updateCachedLocation(HRegionLocation loc, Throwable exception) { } }; - AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(), - User.getCurrent()) { + try (AsyncConnectionImpl mockedConn = + new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) { @Override AsyncRegionLocator getLocator() {