diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 31f369c9e3c..ab1f0dbfb87 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -29,18 +29,18 @@ import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -107,7 +107,7 @@ class AsyncNonMetaRegionLocator { public final Set pendingRequests = new HashSet<>(); public final Map> allRequests = - new HashMap<>(); + new LinkedHashMap<>(); public boolean hasQuota(int max) { return pendingRequests.size() < max; @@ -120,6 +120,49 @@ class AsyncNonMetaRegionLocator { public void send(LocateRequest req) { pendingRequests.add(req); } + + public Optional getCandidate() { + return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); + } + + public void clearCompletedRequests(Optional location) { + for (Iterator>> iter = allRequests + .entrySet().iterator(); iter.hasNext();) { + Map.Entry> entry = iter.next(); + if (tryComplete(entry.getKey(), entry.getValue(), location)) { + iter.remove(); + } + } + } + + private boolean tryComplete(LocateRequest req, CompletableFuture future, + Optional location) { + if (future.isDone()) { + return true; + } + if (!location.isPresent()) { + return false; + } + HRegionLocation loc = location.get(); + boolean completed; + if (req.locateType.equals(RegionLocateType.BEFORE)) { + // for locating the row before current row, the common case is to find the previous region in + // reverse scan, so we check the endKey first. In general, the condition should be startKey < + // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row + // && startKey < req.row). The two conditions are equal since startKey < endKey. + int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row); + completed = + c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0); + } else { + completed = loc.getRegionInfo().containsRow(req.row); + } + if (completed) { + future.complete(loc); + return true; + } else { + return false; + } + } } AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) { @@ -186,48 +229,27 @@ class AsyncNonMetaRegionLocator { } } - private boolean tryComplete(LocateRequest req, CompletableFuture future, - HRegionLocation loc) { - if (future.isDone()) { - return true; - } - boolean completed; - if (req.locateType.equals(RegionLocateType.BEFORE)) { - // for locating the row before current row, the common case is to find the previous region in - // reverse scan, so we check the endKey first. In general, the condition should be startKey < - // req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row - // && startKey < req.row). The two conditions are equal since startKey < endKey. - int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row); - completed = - c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0); - } else { - completed = loc.getRegionInfo().containsRow(req.row); - } - if (completed) { - future.complete(loc); - return true; - } else { - return false; - } - } - private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, Throwable error) { if (error != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to locate region in '" + tableName + "', row='" + - Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, - error); - } + LOG.warn( + "Failed to locate region in '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + + "', locateType=" + req.locateType, error); } - LocateRequest toSend = null; + Optional toSend = Optional.empty(); TableCache tableCache = getTableCache(tableName); if (loc != null) { if (!addToCache(tableCache, loc)) { // someone is ahead of us. synchronized (tableCache) { tableCache.pendingRequests.remove(req); + tableCache.clearCompletedRequests(Optional.empty()); + // Remove a complete locate request in a synchronized block, so the table cache must have + // quota to send a candidate request. + toSend = tableCache.getCandidate(); + toSend.ifPresent(r -> tableCache.send(r)); } + toSend.ifPresent(r -> locateInMeta(tableName, r)); return; } } @@ -239,30 +261,13 @@ class AsyncNonMetaRegionLocator { future.completeExceptionally(error); } } - if (loc != null) { - for (Iterator>> iter = - tableCache.allRequests.entrySet().iterator(); iter.hasNext();) { - Map.Entry> entry = iter.next(); - if (tryComplete(entry.getKey(), entry.getValue(), loc)) { - iter.remove(); - } - } - } - if (!tableCache.allRequests.isEmpty() && - tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) { - LocateRequest[] candidates = tableCache.allRequests.keySet().stream() - .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new); - if (candidates.length > 0) { - // TODO: use a better algorithm to send a request which is more likely to fetch a new - // location. - toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)]; - tableCache.send(toSend); - } - } - } - if (toSend != null) { - locateInMeta(tableName, toSend); + tableCache.clearCompletedRequests(Optional.ofNullable(loc)); + // Remove a complete locate request in a synchronized block, so the table cache must have + // quota to send a candidate request. + toSend = tableCache.getCandidate(); + toSend.ifPresent(r -> tableCache.send(r)); } + toSend.ifPresent(r -> locateInMeta(tableName, r)); } private void onScanComplete(TableName tableName, LocateRequest req, List results, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index 0bb192ba807..80ed02e3366 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -209,6 +209,7 @@ public class TestAsyncNonMetaRegionLocator { throw new RuntimeException(e); } })); + LOCATOR.clearCache(TABLE_NAME); byte[][] endKeys = getEndKeys(); IntStream.range(0, 2).forEach(