HBASE-18598 AsyncNonMetaRegionLocator use FIFO algorithm to get a candidate locate request

This commit is contained in:
Guanghao Zhang 2017-08-15 16:15:29 +08:00
parent 665fd0d07e
commit 59ffb6119b
2 changed files with 63 additions and 57 deletions

View File

@ -29,18 +29,18 @@ import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -107,7 +107,7 @@ class AsyncNonMetaRegionLocator {
public final Set<LocateRequest> pendingRequests = new HashSet<>();
public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
new HashMap<>();
new LinkedHashMap<>();
public boolean hasQuota(int max) {
return pendingRequests.size() < max;
@ -120,6 +120,49 @@ class AsyncNonMetaRegionLocator {
public void send(LocateRequest req) {
pendingRequests.add(req);
}
public Optional<LocateRequest> getCandidate() {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}
public void clearCompletedRequests(Optional<HRegionLocation> location) {
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter = allRequests
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
if (tryComplete(entry.getKey(), entry.getValue(), location)) {
iter.remove();
}
}
}
private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
Optional<HRegionLocation> location) {
if (future.isDone()) {
return true;
}
if (!location.isPresent()) {
return false;
}
HRegionLocation loc = location.get();
boolean completed;
if (req.locateType.equals(RegionLocateType.BEFORE)) {
// for locating the row before current row, the common case is to find the previous region in
// reverse scan, so we check the endKey first. In general, the condition should be startKey <
// req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
// && startKey < req.row). The two conditions are equal since startKey < endKey.
int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
completed =
c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
} else {
completed = loc.getRegionInfo().containsRow(req.row);
}
if (completed) {
future.complete(loc);
return true;
} else {
return false;
}
}
}
AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
@ -186,48 +229,27 @@ class AsyncNonMetaRegionLocator {
}
}
private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
HRegionLocation loc) {
if (future.isDone()) {
return true;
}
boolean completed;
if (req.locateType.equals(RegionLocateType.BEFORE)) {
// for locating the row before current row, the common case is to find the previous region in
// reverse scan, so we check the endKey first. In general, the condition should be startKey <
// req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
// && startKey < req.row). The two conditions are equal since startKey < endKey.
int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
completed =
c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
} else {
completed = loc.getRegionInfo().containsRow(req.row);
}
if (completed) {
future.complete(loc);
return true;
} else {
return false;
}
}
private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
Throwable error) {
if (error != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to locate region in '" + tableName + "', row='" +
Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
error);
}
LOG.warn(
"Failed to locate region in '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
+ "', locateType=" + req.locateType, error);
}
LocateRequest toSend = null;
Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
if (loc != null) {
if (!addToCache(tableCache, loc)) {
// someone is ahead of us.
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
tableCache.clearCompletedRequests(Optional.empty());
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
toSend.ifPresent(r -> locateInMeta(tableName, r));
return;
}
}
@ -239,30 +261,13 @@ class AsyncNonMetaRegionLocator {
future.completeExceptionally(error);
}
}
if (loc != null) {
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
tableCache.allRequests.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
if (tryComplete(entry.getKey(), entry.getValue(), loc)) {
iter.remove();
}
}
}
if (!tableCache.allRequests.isEmpty() &&
tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
.filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new);
if (candidates.length > 0) {
// TODO: use a better algorithm to send a request which is more likely to fetch a new
// location.
toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
tableCache.send(toSend);
}
}
}
if (toSend != null) {
locateInMeta(tableName, toSend);
tableCache.clearCompletedRequests(Optional.ofNullable(loc));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
toSend.ifPresent(r -> locateInMeta(tableName, r));
}
private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,

View File

@ -209,6 +209,7 @@ public class TestAsyncNonMetaRegionLocator {
throw new RuntimeException(e);
}
}));
LOCATOR.clearCache(TABLE_NAME);
byte[][] endKeys = getEndKeys();
IntStream.range(0, 2).forEach(