HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture outside lock block (#4496)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
93996bd578
commit
176c43c5ad
|
@ -35,10 +35,12 @@ import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
|
|||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -122,6 +124,26 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class RegionLocationsFutureResult {
|
||||
private final CompletableFuture<RegionLocations> future;
|
||||
private final RegionLocations result;
|
||||
private final Throwable e;
|
||||
|
||||
public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
|
||||
RegionLocations result, Throwable e) {
|
||||
this.future = future;
|
||||
this.result = result;
|
||||
this.e = e;
|
||||
}
|
||||
|
||||
public void complete() {
|
||||
if (e != null) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
future.complete(result);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class TableCache {
|
||||
|
||||
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
|
||||
|
@ -148,18 +170,20 @@ class AsyncNonMetaRegionLocator {
|
|||
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
|
||||
}
|
||||
|
||||
public void clearCompletedRequests(RegionLocations locations) {
|
||||
public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
|
||||
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
||||
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
|
||||
allRequests.entrySet().iterator(); iter.hasNext();) {
|
||||
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
|
||||
if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
|
||||
if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
return futureResultList;
|
||||
}
|
||||
|
||||
private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
|
||||
RegionLocations locations) {
|
||||
RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
|
||||
if (future.isDone()) {
|
||||
return true;
|
||||
}
|
||||
|
@ -185,7 +209,7 @@ class AsyncNonMetaRegionLocator {
|
|||
completed = loc.getRegion().containsRow(req.row);
|
||||
}
|
||||
if (completed) {
|
||||
future.complete(locations);
|
||||
futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -319,32 +343,36 @@ class AsyncNonMetaRegionLocator {
|
|||
TableCache tableCache = getTableCache(tableName);
|
||||
if (locs != null) {
|
||||
RegionLocations addedLocs = addToCache(tableCache, locs);
|
||||
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
||||
synchronized (tableCache) {
|
||||
tableCache.pendingRequests.remove(req);
|
||||
tableCache.clearCompletedRequests(addedLocs);
|
||||
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
|
||||
// Remove a complete locate request in a synchronized block, so the table cache must have
|
||||
// quota to send a candidate request.
|
||||
toSend = tableCache.getCandidate();
|
||||
toSend.ifPresent(r -> tableCache.send(r));
|
||||
}
|
||||
futureResultList.forEach(RegionLocationsFutureResult::complete);
|
||||
toSend.ifPresent(r -> locateInMeta(tableName, r));
|
||||
} else {
|
||||
// we meet an error
|
||||
assert error != null;
|
||||
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
||||
synchronized (tableCache) {
|
||||
tableCache.pendingRequests.remove(req);
|
||||
// fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
|
||||
// already retried several times
|
||||
CompletableFuture<?> future = tableCache.allRequests.remove(req);
|
||||
CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
|
||||
if (future != null) {
|
||||
future.completeExceptionally(error);
|
||||
futureResultList.add(new RegionLocationsFutureResult(future, null, error));
|
||||
}
|
||||
tableCache.clearCompletedRequests(null);
|
||||
futureResultList.addAll(tableCache.clearCompletedRequests(null));
|
||||
// Remove a complete locate request in a synchronized block, so the table cache must have
|
||||
// quota to send a candidate request.
|
||||
toSend = tableCache.getCandidate();
|
||||
toSend.ifPresent(r -> tableCache.send(r));
|
||||
}
|
||||
futureResultList.forEach(RegionLocationsFutureResult::complete);
|
||||
toSend.ifPresent(r -> locateInMeta(tableName, r));
|
||||
}
|
||||
}
|
||||
|
@ -542,9 +570,11 @@ class AsyncNonMetaRegionLocator {
|
|||
continue;
|
||||
}
|
||||
RegionLocations addedLocs = addToCache(tableCache, locs);
|
||||
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
||||
synchronized (tableCache) {
|
||||
tableCache.clearCompletedRequests(addedLocs);
|
||||
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
|
||||
}
|
||||
futureResultList.forEach(RegionLocationsFutureResult::complete);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -676,12 +706,16 @@ class AsyncNonMetaRegionLocator {
|
|||
if (tableCache == null) {
|
||||
return;
|
||||
}
|
||||
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
|
||||
synchronized (tableCache) {
|
||||
if (!tableCache.allRequests.isEmpty()) {
|
||||
IOException error = new IOException("Cache cleared");
|
||||
tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
|
||||
tableCache.allRequests.values().forEach(f -> {
|
||||
futureResultList.add(new RegionLocationsFutureResult(f, null, error));
|
||||
});
|
||||
}
|
||||
}
|
||||
futureResultList.forEach(RegionLocationsFutureResult::complete);
|
||||
conn.getConnectionMetrics()
|
||||
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue