HBASE-17334 Add locate row before/after support for AsyncRegionLocator
This commit is contained in:
parent
66781864aa
commit
09bb428763
|
@ -118,23 +118,22 @@ class AsyncClientScanner {
|
|||
.setScan(scan).consumer(consumer).resultCache(resultCache)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start()
|
||||
.whenComplete((locateToPreviousRegion, error) -> {
|
||||
.whenComplete((locateType, error) -> {
|
||||
if (error != null) {
|
||||
consumer.onError(error);
|
||||
return;
|
||||
}
|
||||
if (locateToPreviousRegion == null) {
|
||||
if (locateType == null) {
|
||||
consumer.onComplete();
|
||||
} else {
|
||||
openScanner(locateToPreviousRegion.booleanValue());
|
||||
openScanner(locateType);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void openScanner(boolean locateToPreviousRegion) {
|
||||
private void openScanner(RegionLocateType locateType) {
|
||||
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
|
||||
.locateToPreviousRegion(locateToPreviousRegion)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.locateType(locateType).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call()
|
||||
.whenComplete((resp, error) -> {
|
||||
if (error != null) {
|
||||
|
@ -146,6 +145,7 @@ class AsyncClientScanner {
|
|||
}
|
||||
|
||||
public void start() {
|
||||
openScanner(scan.isReversed() && isEmptyStartRow(scan.getStartRow()));
|
||||
openScanner(scan.isReversed() && isEmptyStartRow(scan.getStartRow()) ? RegionLocateType.BEFORE
|
||||
: RegionLocateType.CURRENT);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -374,7 +374,8 @@ class AsyncMultiGetRpcRetryingCaller {
|
|||
new ConcurrentHashMap<>();
|
||||
ConcurrentLinkedQueue<Get> locateFailed = new ConcurrentLinkedQueue<>();
|
||||
CompletableFuture.allOf(gets.map(get -> conn.getLocator()
|
||||
.getRegionLocation(tableName, get.getRow(), locateTimeoutNs).whenComplete((loc, error) -> {
|
||||
.getRegionLocation(tableName, get.getRow(), RegionLocateType.CURRENT, locateTimeoutNs)
|
||||
.whenComplete((loc, error) -> {
|
||||
if (error != null) {
|
||||
error = translateException(error);
|
||||
if (error instanceof DoNotRetryIOException) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
|||
import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocator.updateCachedLoation;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
|
||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||
|
@ -77,16 +78,16 @@ class AsyncNonMetaRegionLocator {
|
|||
|
||||
public final byte[] row;
|
||||
|
||||
public final boolean locateToPrevious;
|
||||
public final RegionLocateType locateType;
|
||||
|
||||
public LocateRequest(byte[] row, boolean locateToPrevious) {
|
||||
public LocateRequest(byte[] row, RegionLocateType locateType) {
|
||||
this.row = row;
|
||||
this.locateToPrevious = locateToPrevious;
|
||||
this.locateType = locateType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Bytes.hashCode(row) ^ Boolean.hashCode(locateToPrevious);
|
||||
return Bytes.hashCode(row) ^ locateType.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -95,7 +96,7 @@ class AsyncNonMetaRegionLocator {
|
|||
return false;
|
||||
}
|
||||
LocateRequest that = (LocateRequest) obj;
|
||||
return locateToPrevious == that.locateToPrevious && Bytes.equals(row, that.row);
|
||||
return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -192,8 +193,14 @@ class AsyncNonMetaRegionLocator {
|
|||
return true;
|
||||
}
|
||||
boolean completed;
|
||||
if (req.locateToPrevious) {
|
||||
completed = Bytes.equals(loc.getRegionInfo().getEndKey(), req.row);
|
||||
if (req.locateType.equals(RegionLocateType.BEFORE)) {
|
||||
// for locating the row before current row, the common case is to find the previous region in
|
||||
// reverse scan, so we check the endKey first. In general, the condition should be startKey <
|
||||
// req.row and endKey >= req.row. Here we split it to endKey == req.row || (endKey > req.row
|
||||
// && startKey < req.row). The two conditions are equal since startKey < endKey.
|
||||
int c = Bytes.compareTo(loc.getRegionInfo().getEndKey(), req.row);
|
||||
completed =
|
||||
c == 0 || (c > 0 && Bytes.compareTo(loc.getRegionInfo().getStartKey(), req.row) < 0);
|
||||
} else {
|
||||
completed = loc.getRegionInfo().containsRow(req.row);
|
||||
}
|
||||
|
@ -206,11 +213,11 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
|
||||
private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
|
||||
Throwable error, String rowNameInErrorMsg) {
|
||||
Throwable error) {
|
||||
if (error != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Failed to locate region in '" + tableName + "', " + rowNameInErrorMsg + "='"
|
||||
+ Bytes.toStringBinary(req.row) + "'",
|
||||
LOG.debug("Failed to locate region in '" + tableName + "', row='"
|
||||
+ Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
|
||||
error);
|
||||
}
|
||||
}
|
||||
|
@ -254,87 +261,67 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
}
|
||||
if (toSend != null) {
|
||||
if (toSend.locateToPrevious) {
|
||||
locatePreviousInMeta(tableName, toSend);
|
||||
} else {
|
||||
locateInMeta(tableName, toSend);
|
||||
}
|
||||
locateInMeta(tableName, toSend);
|
||||
}
|
||||
}
|
||||
|
||||
private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,
|
||||
Throwable error, String rowNameInErrorMsg) {
|
||||
Throwable error) {
|
||||
if (error != null) {
|
||||
complete(tableName, req, null, error, rowNameInErrorMsg);
|
||||
complete(tableName, req, null, error);
|
||||
return;
|
||||
}
|
||||
if (results.isEmpty()) {
|
||||
complete(tableName, req, null, new TableNotFoundException(tableName), rowNameInErrorMsg);
|
||||
complete(tableName, req, null, new TableNotFoundException(tableName));
|
||||
return;
|
||||
}
|
||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
|
||||
+ Bytes.toStringBinary(req.row) + "' is " + locs);
|
||||
LOG.debug("The fetched location of '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
|
||||
+ "', locateType=" + req.locateType + " is " + locs);
|
||||
}
|
||||
if (locs == null || locs.getDefaultRegionLocation() == null) {
|
||||
complete(tableName, req, null,
|
||||
new IOException(String.format("No location found for '%s', %s='%s'", tableName,
|
||||
rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
|
||||
rowNameInErrorMsg);
|
||||
new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
|
||||
tableName, Bytes.toStringBinary(req.row), req.locateType)));
|
||||
return;
|
||||
}
|
||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||
HRegionInfo info = loc.getRegionInfo();
|
||||
if (info == null) {
|
||||
complete(tableName, req, null,
|
||||
new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
|
||||
rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
|
||||
rowNameInErrorMsg);
|
||||
new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
|
||||
tableName, Bytes.toStringBinary(req.row), req.locateType)));
|
||||
return;
|
||||
}
|
||||
if (!info.getTable().equals(tableName)) {
|
||||
complete(tableName, req, null,
|
||||
new TableNotFoundException(
|
||||
"Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"),
|
||||
rowNameInErrorMsg);
|
||||
complete(tableName, req, null, new TableNotFoundException(
|
||||
"Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
|
||||
return;
|
||||
}
|
||||
if (info.isSplit()) {
|
||||
complete(tableName, req, null,
|
||||
new RegionOfflineException(
|
||||
"the only available region for the required row is a split parent,"
|
||||
+ " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"),
|
||||
rowNameInErrorMsg);
|
||||
+ " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
}
|
||||
if (info.isOffline()) {
|
||||
complete(tableName, req, null,
|
||||
new RegionOfflineException("the region is offline, could"
|
||||
+ " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"),
|
||||
rowNameInErrorMsg);
|
||||
complete(tableName, req, null, new RegionOfflineException("the region is offline, could"
|
||||
+ " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
}
|
||||
if (loc.getServerName() == null) {
|
||||
complete(tableName, req, null,
|
||||
new NoServerForRegionException(
|
||||
String.format("No server address listed for region '%s', %s='%s'",
|
||||
info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
|
||||
rowNameInErrorMsg);
|
||||
String.format("No server address listed for region '%s', row='%s', locateType=%s",
|
||||
info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
|
||||
return;
|
||||
}
|
||||
if (req.locateToPrevious && !Bytes.equals(info.getEndKey(), req.row)) {
|
||||
complete(tableName, req, null,
|
||||
new DoNotRetryIOException("The end key of '" + info.getRegionNameAsString() + "' is '"
|
||||
+ Bytes.toStringBinary(info.getEndKey()) + "', expected '"
|
||||
+ Bytes.toStringBinary(req.row) + "'"),
|
||||
rowNameInErrorMsg);
|
||||
return;
|
||||
}
|
||||
complete(tableName, req, loc, null, rowNameInErrorMsg);
|
||||
complete(tableName, req, loc, null);
|
||||
}
|
||||
|
||||
private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row) {
|
||||
private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
|
||||
Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
|
||||
if (entry == null) {
|
||||
return null;
|
||||
|
@ -344,7 +331,7 @@ class AsyncNonMetaRegionLocator {
|
|||
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
|
||||
+ Bytes.toStringBinary(row) + "'");
|
||||
+ Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
|
||||
}
|
||||
return loc;
|
||||
} else {
|
||||
|
@ -352,22 +339,19 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
}
|
||||
|
||||
private HRegionLocation locatePreviousInCache(TableCache tableCache, TableName tableName,
|
||||
byte[] startRowOfCurrentRegion) {
|
||||
Map.Entry<byte[], HRegionLocation> entry;
|
||||
if (isEmptyStopRow(startRowOfCurrentRegion)) {
|
||||
entry = tableCache.cache.lastEntry();
|
||||
} else {
|
||||
entry = tableCache.cache.lowerEntry(startRowOfCurrentRegion);
|
||||
}
|
||||
private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
|
||||
byte[] row) {
|
||||
Map.Entry<byte[], HRegionLocation> entry =
|
||||
isEmptyStopRow(row) ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
HRegionLocation loc = entry.getValue();
|
||||
if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
|
||||
if (isEmptyStopRow(loc.getRegionInfo().getEndKey())
|
||||
|| Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
|
||||
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
|
||||
+ Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
|
||||
}
|
||||
return loc;
|
||||
} else {
|
||||
|
@ -377,46 +361,41 @@ class AsyncNonMetaRegionLocator {
|
|||
|
||||
private void locateInMeta(TableName tableName, LocateRequest req) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(
|
||||
"Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + "' in meta");
|
||||
}
|
||||
byte[] metaKey = createRegionName(tableName, req.row, NINES, false);
|
||||
conn.getRawTable(META_TABLE_NAME)
|
||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error, "row"));
|
||||
}
|
||||
|
||||
private void locatePreviousInMeta(TableName tableName, LocateRequest req) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
|
||||
+ Bytes.toStringBinary(req.row) + "' in meta");
|
||||
LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
|
||||
+ "', locateType=" + req.locateType + " in meta");
|
||||
}
|
||||
byte[] metaKey;
|
||||
if (isEmptyStopRow(req.row)) {
|
||||
byte[] binaryTableName = tableName.getName();
|
||||
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
|
||||
if (req.locateType.equals(RegionLocateType.BEFORE)) {
|
||||
if (isEmptyStopRow(req.row)) {
|
||||
byte[] binaryTableName = tableName.getName();
|
||||
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
|
||||
} else {
|
||||
metaKey = createRegionName(tableName, req.row, ZEROES, false);
|
||||
}
|
||||
} else {
|
||||
metaKey = createRegionName(tableName, req.row, ZEROES, false);
|
||||
metaKey = createRegionName(tableName, req.row, NINES, false);
|
||||
}
|
||||
conn.getRawTable(META_TABLE_NAME)
|
||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error,
|
||||
"startRowOfCurrentRegion"));
|
||||
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error));
|
||||
}
|
||||
|
||||
private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
|
||||
boolean locateToPrevious) {
|
||||
return locateToPrevious ? locatePreviousInCache(tableCache, tableName, row)
|
||||
: locateInCache(tableCache, tableName, row);
|
||||
RegionLocateType locateType) {
|
||||
return locateType.equals(RegionLocateType.BEFORE)
|
||||
? locateRowBeforeInCache(tableCache, tableName, row)
|
||||
: locateRowInCache(tableCache, tableName, row);
|
||||
}
|
||||
|
||||
// locateToPrevious is true means we will use the start key of a region to locate the region
|
||||
// placed before it. Used for reverse scan. See the comment of
|
||||
// AsyncRegionLocator.getPreviousRegionLocation.
|
||||
private CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
boolean locateToPrevious) {
|
||||
private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
|
||||
byte[] row, RegionLocateType locateType) {
|
||||
// AFTER should be convert to CURRENT before calling this method
|
||||
assert !locateType.equals(RegionLocateType.AFTER);
|
||||
TableCache tableCache = getTableCache(tableName);
|
||||
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateToPrevious);
|
||||
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
|
||||
if (loc != null) {
|
||||
return CompletableFuture.completedFuture(loc);
|
||||
}
|
||||
|
@ -425,11 +404,11 @@ class AsyncNonMetaRegionLocator {
|
|||
boolean sendRequest = false;
|
||||
synchronized (tableCache) {
|
||||
// check again
|
||||
loc = locateInCache(tableCache, tableName, row, locateToPrevious);
|
||||
loc = locateInCache(tableCache, tableName, row, locateType);
|
||||
if (loc != null) {
|
||||
return CompletableFuture.completedFuture(loc);
|
||||
}
|
||||
req = new LocateRequest(row, locateToPrevious);
|
||||
req = new LocateRequest(row, locateType);
|
||||
future = tableCache.allRequests.get(req);
|
||||
if (future == null) {
|
||||
future = new CompletableFuture<>();
|
||||
|
@ -441,25 +420,23 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
}
|
||||
if (sendRequest) {
|
||||
if (locateToPrevious) {
|
||||
locatePreviousInMeta(tableName, req);
|
||||
} else {
|
||||
locateInMeta(tableName, req);
|
||||
}
|
||||
locateInMeta(tableName, req);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
|
||||
return getRegionLocation(tableName, row, false);
|
||||
}
|
||||
|
||||
// Used for reverse scan. See the comment of AsyncRegionLocator.getPreviousRegionLocation.
|
||||
// TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
|
||||
// of a region.
|
||||
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion) {
|
||||
return getRegionLocation(tableName, startRowOfCurrentRegion, true);
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
RegionLocateType locateType) {
|
||||
if (locateType.equals(RegionLocateType.BEFORE)) {
|
||||
return getRegionLocationInternal(tableName, row, locateType);
|
||||
} else {
|
||||
// as we know the exact row after us, so we can just create the new row, and use the same
|
||||
// algorithm to locate it.
|
||||
if (locateType.equals(RegionLocateType.AFTER)) {
|
||||
row = createClosestRowAfter(row);
|
||||
}
|
||||
return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT);
|
||||
}
|
||||
}
|
||||
|
||||
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
||||
|
|
|
@ -79,33 +79,18 @@ class AsyncRegionLocator {
|
|||
}
|
||||
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
long timeoutNs) {
|
||||
RegionLocateType type, long timeoutNs) {
|
||||
// meta region can not be split right now so we always call the same method.
|
||||
// Change it later if the meta table can have more than one regions.
|
||||
CompletableFuture<HRegionLocation> future =
|
||||
tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
|
||||
: nonMetaRegionLocator.getRegionLocation(tableName, row);
|
||||
: nonMetaRegionLocator.getRegionLocation(tableName, row, type);
|
||||
return withTimeout(future, timeoutNs,
|
||||
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
|
||||
+ "ms) waiting for region location for " + tableName + ", row='"
|
||||
+ Bytes.toStringBinary(row) + "'");
|
||||
}
|
||||
|
||||
/**
|
||||
* Locate the previous region using the current regions start key. Used for reverse scan as the
|
||||
* end key is not included in a region so we need to treat it differently.
|
||||
*/
|
||||
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion, long timeoutNs) {
|
||||
// meta region can not be split right now so we call the same method as getRegionLocation.
|
||||
// Change it later if the meta table can have more than one regions.
|
||||
CompletableFuture<HRegionLocation> future =
|
||||
tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
|
||||
: nonMetaRegionLocator.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
|
||||
return withTimeout(future, timeoutNs,
|
||||
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
|
||||
+ "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='"
|
||||
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
|
||||
}
|
||||
|
||||
static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
|
||||
// Do not need to update if no such location, or the location is newer, or the location is not
|
||||
// same with us
|
||||
|
|
|
@ -58,7 +58,7 @@ class AsyncRpcRetryingCallerFactory {
|
|||
|
||||
private long rpcTimeoutNs = -1L;
|
||||
|
||||
private boolean locateToPreviousRegion;
|
||||
private RegionLocateType locateType = RegionLocateType.CURRENT;
|
||||
|
||||
public SingleRequestCallerBuilder<T> table(TableName tableName) {
|
||||
this.tableName = tableName;
|
||||
|
@ -86,15 +86,15 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public SingleRequestCallerBuilder<T> locateToPreviousRegion(boolean locateToPreviousRegion) {
|
||||
this.locateToPreviousRegion = locateToPreviousRegion;
|
||||
public SingleRequestCallerBuilder<T> locateType(RegionLocateType locateType) {
|
||||
this.locateType = locateType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AsyncSingleRequestRpcRetryingCaller<T> build() {
|
||||
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
|
||||
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
|
||||
locateToPreviousRegion, checkNotNull(callable, "action is null"),
|
||||
checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
|
||||
conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
|
||||
rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
|
||||
}
|
||||
|
@ -246,7 +246,7 @@ class AsyncRpcRetryingCallerFactory {
|
|||
/**
|
||||
* Short cut for {@code build().start()}.
|
||||
*/
|
||||
public CompletableFuture<Boolean> start() {
|
||||
public CompletableFuture<RegionLocateType> start() {
|
||||
return build().start();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
|
||||
private final Runnable completeWhenNoMoreResultsInRegion;
|
||||
|
||||
private final CompletableFuture<Boolean> future;
|
||||
private final CompletableFuture<RegionLocateType> future;
|
||||
|
||||
private final HBaseRpcController controller;
|
||||
|
||||
|
@ -172,7 +172,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
|
||||
private void completeWithNextStartRow(byte[] nextStartRow) {
|
||||
scan.setStartRow(nextStartRow);
|
||||
future.complete(scan.isReversed());
|
||||
future.complete(scan.isReversed() ? RegionLocateType.BEFORE : RegionLocateType.CURRENT);
|
||||
}
|
||||
|
||||
private byte[] createNextStartRowWhenError() {
|
||||
|
@ -193,7 +193,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
includeNextStartRowWhenError ? nextStartRowWhenError : createNextStartRowWhenError.get());
|
||||
}
|
||||
future.complete(
|
||||
scan.isReversed() && Bytes.equals(scan.getStartRow(), loc.getRegionInfo().getEndKey()));
|
||||
scan.isReversed() && Bytes.equals(scan.getStartRow(), loc.getRegionInfo().getEndKey())
|
||||
? RegionLocateType.BEFORE : RegionLocateType.CURRENT);
|
||||
}
|
||||
|
||||
private void onError(Throwable error) {
|
||||
|
@ -344,7 +345,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
/**
|
||||
* @return return locate direction for next open scanner call, or null if we should stop.
|
||||
*/
|
||||
public CompletableFuture<Boolean> start() {
|
||||
public CompletableFuture<RegionLocateType> start() {
|
||||
next();
|
||||
return future;
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import java.util.List;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -67,7 +66,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
|
||||
private final byte[] row;
|
||||
|
||||
private final Function<Long, CompletableFuture<HRegionLocation>> locate;
|
||||
private final RegionLocateType locateType;
|
||||
|
||||
private final Callable<T> callable;
|
||||
|
||||
|
@ -90,18 +89,14 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
private final long startNs;
|
||||
|
||||
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
|
||||
TableName tableName, byte[] row, boolean locateToPreviousRegion, Callable<T> callable,
|
||||
TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
|
||||
long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs,
|
||||
int startLogErrorsCnt) {
|
||||
this.retryTimer = retryTimer;
|
||||
this.conn = conn;
|
||||
this.tableName = tableName;
|
||||
this.row = row;
|
||||
if (locateToPreviousRegion) {
|
||||
this.locate = this::locatePrevious;
|
||||
} else {
|
||||
this.locate = this::locate;
|
||||
}
|
||||
this.locateType = locateType;
|
||||
this.callable = callable;
|
||||
this.pauseNs = pauseNs;
|
||||
this.maxAttempts = retries2Attempts(maxRetries);
|
||||
|
@ -210,27 +205,20 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
} else {
|
||||
locateTimeoutNs = -1L;
|
||||
}
|
||||
locate.apply(locateTimeoutNs).whenComplete((loc, error) -> {
|
||||
if (error != null) {
|
||||
onError(error,
|
||||
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
|
||||
+ tries + ", maxAttempts = " + maxAttempts + ", timeout = "
|
||||
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
|
||||
+ elapsedMs() + " ms",
|
||||
err -> {
|
||||
});
|
||||
return;
|
||||
}
|
||||
call(loc);
|
||||
});
|
||||
}
|
||||
|
||||
private CompletableFuture<HRegionLocation> locate(long timeoutNs) {
|
||||
return conn.getLocator().getRegionLocation(tableName, row, timeoutNs);
|
||||
}
|
||||
|
||||
private CompletableFuture<HRegionLocation> locatePrevious(long timeoutNs) {
|
||||
return conn.getLocator().getPreviousRegionLocation(tableName, row, timeoutNs);
|
||||
conn.getLocator().getRegionLocation(tableName, row, locateType, locateTimeoutNs)
|
||||
.whenComplete((loc, error) -> {
|
||||
if (error != null) {
|
||||
onError(error,
|
||||
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
|
||||
+ " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
|
||||
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
|
||||
+ elapsedMs() + " ms",
|
||||
err -> {
|
||||
});
|
||||
return;
|
||||
}
|
||||
call(loc);
|
||||
});
|
||||
}
|
||||
|
||||
public CompletableFuture<T> call() {
|
||||
|
|
|
@ -144,7 +144,7 @@ class AsyncSmallScanRpcRetryingCaller {
|
|||
scan.setStartRow(
|
||||
createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow()));
|
||||
}
|
||||
scan(false);
|
||||
scan(RegionLocateType.CURRENT);
|
||||
return;
|
||||
}
|
||||
if (!nextScan.apply(resp.currentRegion)) {
|
||||
|
@ -152,12 +152,11 @@ class AsyncSmallScanRpcRetryingCaller {
|
|||
}
|
||||
}
|
||||
|
||||
private void scan(boolean locateToPreviousRegion) {
|
||||
private void scan(RegionLocateType locateType) {
|
||||
conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.locateToPreviousRegion(locateToPreviousRegion).action(this::scan).call()
|
||||
.whenComplete((resp, error) -> {
|
||||
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).locateType(locateType)
|
||||
.action(this::scan).call().whenComplete((resp, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
} else {
|
||||
|
@ -172,11 +171,11 @@ class AsyncSmallScanRpcRetryingCaller {
|
|||
}
|
||||
|
||||
private void firstScan() {
|
||||
scan(false);
|
||||
scan(RegionLocateType.CURRENT);
|
||||
}
|
||||
|
||||
private void reversedFirstScan() {
|
||||
scan(isEmptyStartRow(scan.getStartRow()));
|
||||
scan(isEmptyStartRow(scan.getStartRow()) ? RegionLocateType.BEFORE : RegionLocateType.CURRENT);
|
||||
}
|
||||
|
||||
private boolean nextScan(HRegionInfo region) {
|
||||
|
@ -190,7 +189,7 @@ class AsyncSmallScanRpcRetryingCaller {
|
|||
}
|
||||
}
|
||||
scan.setStartRow(region.getEndKey());
|
||||
scan(false);
|
||||
scan(RegionLocateType.CURRENT);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -205,7 +204,7 @@ class AsyncSmallScanRpcRetryingCaller {
|
|||
}
|
||||
}
|
||||
scan.setStartRow(region.getStartKey());
|
||||
scan(true);
|
||||
scan(RegionLocateType.BEFORE);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
|
|||
|
||||
@Override
|
||||
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
|
||||
return locator.getRegionLocation(tableName, row, 0L);
|
||||
return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, -1L);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Indicate which row you want to locate.
|
||||
* <ul>
|
||||
* <li>{@link #BEFORE} locate the region which contains the row before the given row.</li>
|
||||
* <li>{@link #CURRENT} locate the region which contains the given row.</li>
|
||||
* <li>{@link #AFTER} locate the region which contains the row after the given row.</li>
|
||||
* </ul>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
enum RegionLocateType {
|
||||
BEFORE, CURRENT, AFTER
|
||||
}
|
|
@ -24,7 +24,6 @@ import static org.junit.Assert.assertArrayEquals;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -102,15 +101,12 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
|
||||
@Test
|
||||
public void testNoTable() throws InterruptedException {
|
||||
try {
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
try {
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
for (RegionLocateType locateType : RegionLocateType.values()) {
|
||||
try {
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -118,15 +114,12 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
public void testDisableTable() throws IOException, InterruptedException {
|
||||
createSingleRegionTable();
|
||||
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
|
||||
try {
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
try {
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
for (RegionLocateType locateType : RegionLocateType.values()) {
|
||||
try {
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,21 +136,15 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
|
||||
createSingleRegionTable();
|
||||
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
for (RegionLocateType locateType : RegionLocateType.values()) {
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType).get());
|
||||
}
|
||||
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
|
||||
ThreadLocalRandom.current().nextBytes(randKey);
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
|
||||
// Use a key which is not the endKey of a region will cause error
|
||||
try {
|
||||
for (RegionLocateType locateType : RegionLocateType.values()) {
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(IOException.class));
|
||||
assertTrue(e.getCause().getMessage().contains("end key of"));
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType).get());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -194,7 +181,19 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
|
||||
try {
|
||||
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
|
||||
serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
|
||||
serverNames[i],
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT).get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
|
||||
LOCATOR.clearCache(TABLE_NAME);
|
||||
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
|
||||
try {
|
||||
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
|
||||
serverNames[i],
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER).get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -205,7 +204,7 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
|
||||
try {
|
||||
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE).get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -216,7 +215,8 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
|
||||
createSingleRegionTable();
|
||||
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
||||
HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||
HRegionLocation loc =
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get();
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
|
||||
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
|
||||
|
@ -229,12 +229,39 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
Thread.sleep(100);
|
||||
}
|
||||
// Should be same as it is in cache
|
||||
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
assertSame(loc,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get());
|
||||
LOCATOR.updateCachedLocation(loc, null);
|
||||
// null error will not trigger a cache cleanup
|
||||
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
assertSame(loc,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get());
|
||||
LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get());
|
||||
}
|
||||
|
||||
// usually locate after will return the same result, so we add a test to make it return different
|
||||
// result.
|
||||
@Test
|
||||
public void testLocateAfter() throws IOException, InterruptedException, ExecutionException {
|
||||
byte[] row = Bytes.toBytes("1");
|
||||
byte[] splitKey = Arrays.copyOf(row, 2);
|
||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
HRegionLocation currentLoc =
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT).get();
|
||||
ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
||||
assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
|
||||
|
||||
HRegionLocation afterLoc =
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER).get();
|
||||
ServerName afterServerName =
|
||||
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
|
||||
.filter(rs -> rs.getOnlineRegions(TABLE_NAME).stream()
|
||||
.anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
|
||||
.findAny().get().getServerName();
|
||||
assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
|
||||
|
||||
assertSame(afterLoc, LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER).get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static java.util.stream.Collectors.toCollection;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
|
||||
import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||
|
@ -28,7 +26,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -147,12 +144,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
|||
|
||||
@Test
|
||||
public void test() throws InterruptedException, ExecutionException {
|
||||
List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 128)
|
||||
.mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
|
||||
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r)).collect(toCollection(ArrayList::new));
|
||||
futures.addAll(IntStream.range(129, 257)
|
||||
.mapToObj(i -> i < 256 ? Bytes.toBytes(String.format("%02x", i)) : EMPTY_START_ROW)
|
||||
.map(r -> LOCATOR.getPreviousRegionLocation(TABLE_NAME, r)).collect(toList()));
|
||||
List<CompletableFuture<HRegionLocation>> futures =
|
||||
IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
|
||||
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT))
|
||||
.collect(toList());
|
||||
assertLocs(futures);
|
||||
assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED);
|
||||
}
|
||||
|
|
|
@ -100,8 +100,8 @@ public class TestAsyncRegionLocatorTimeout {
|
|||
SLEEP_MS = 1000;
|
||||
long startNs = System.nanoTime();
|
||||
try {
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500))
|
||||
.get();
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT,
|
||||
TimeUnit.MILLISECONDS.toNanos(500)).get();
|
||||
fail();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
|
@ -113,8 +113,8 @@ public class TestAsyncRegionLocatorTimeout {
|
|||
// wait for the background task finish
|
||||
Thread.sleep(2000);
|
||||
// Now the location should be in cache, so we will not visit meta again.
|
||||
HRegionLocation loc = LOCATOR
|
||||
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, TimeUnit.MILLISECONDS.toNanos(500)).get();
|
||||
HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW,
|
||||
RegionLocateType.CURRENT, TimeUnit.MILLISECONDS.toNanos(500)).get();
|
||||
assertEquals(loc.getServerName(),
|
||||
TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName());
|
||||
}
|
||||
|
|
|
@ -154,7 +154,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
|
|||
new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) {
|
||||
@Override
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
long timeoutNs) {
|
||||
RegionLocateType locateType, long timeoutNs) {
|
||||
if (tableName.equals(TABLE_NAME)) {
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
if (count.getAndIncrement() == 0) {
|
||||
|
@ -165,16 +165,10 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
|
|||
}
|
||||
return future;
|
||||
} else {
|
||||
return super.getRegionLocation(tableName, row, timeoutNs);
|
||||
return super.getRegionLocation(tableName, row, locateType, timeoutNs);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion, long timeoutNs) {
|
||||
return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion, timeoutNs);
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue