HBASE-17402 TestAsyncTableScan sometimes hangs
This commit is contained in:
parent
9ec0ec4922
commit
af9d359b8e
|
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.RegionLocations;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
|
@ -140,8 +139,8 @@ class AsyncNonMetaRegionLocator {
|
|||
return;
|
||||
}
|
||||
tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
|
||||
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
||||
|| !oldLoc.getServerName().equals(loc.getServerName())) {
|
||||
if (oldLoc.getSeqNum() > loc.getSeqNum() ||
|
||||
!oldLoc.getServerName().equals(loc.getServerName())) {
|
||||
return oldLoc;
|
||||
}
|
||||
return null;
|
||||
|
@ -158,11 +157,11 @@ class AsyncNonMetaRegionLocator {
|
|||
if (oldLoc == null) {
|
||||
return true;
|
||||
}
|
||||
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
||||
|| oldLoc.getServerName().equals(loc.getServerName())) {
|
||||
if (oldLoc.getSeqNum() > loc.getSeqNum() ||
|
||||
oldLoc.getServerName().equals(loc.getServerName())) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
|
||||
+ " is newer than us or has the same server name");
|
||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc +
|
||||
" is newer than us or has the same server name");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -171,9 +170,9 @@ class AsyncNonMetaRegionLocator {
|
|||
return loc;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
|
||||
+ " is newer than us or has the same server name."
|
||||
+ " Maybe it is updated before we replace it");
|
||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue +
|
||||
" is newer than us or has the same server name." +
|
||||
" Maybe it is updated before we replace it");
|
||||
}
|
||||
return oldValue;
|
||||
});
|
||||
|
@ -217,8 +216,8 @@ class AsyncNonMetaRegionLocator {
|
|||
Throwable error) {
|
||||
if (error != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Failed to locate region in '" + tableName + "', row='"
|
||||
+ Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
|
||||
LOG.debug("Failed to locate region in '" + tableName + "', row='" +
|
||||
Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType,
|
||||
error);
|
||||
}
|
||||
}
|
||||
|
@ -250,14 +249,15 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!tableCache.allRequests.isEmpty()
|
||||
&& tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
|
||||
if (!tableCache.allRequests.isEmpty() &&
|
||||
tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
|
||||
LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
|
||||
.filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new);
|
||||
if (candidates.length > 0) {
|
||||
// TODO: use a better algorithm to send a request which is more likely to fetch a new
|
||||
// location.
|
||||
toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
|
||||
tableCache.send(toSend);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -278,8 +278,8 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The fetched location of '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
|
||||
+ "', locateType=" + req.locateType + " is " + locs);
|
||||
LOG.debug("The fetched location of '" + tableName + "', row='" +
|
||||
Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType + " is " + locs);
|
||||
}
|
||||
if (locs == null || locs.getDefaultRegionLocation() == null) {
|
||||
complete(tableName, req, null,
|
||||
|
@ -303,13 +303,13 @@ class AsyncNonMetaRegionLocator {
|
|||
if (info.isSplit()) {
|
||||
complete(tableName, req, null,
|
||||
new RegionOfflineException(
|
||||
"the only available region for the required row is a split parent,"
|
||||
+ " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
|
||||
"the only available region for the required row is a split parent," +
|
||||
" the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
}
|
||||
if (info.isOffline()) {
|
||||
complete(tableName, req, null, new RegionOfflineException("the region is offline, could"
|
||||
+ " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
|
||||
complete(tableName, req, null, new RegionOfflineException("the region is offline, could" +
|
||||
" be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
}
|
||||
if (loc.getServerName() == null) {
|
||||
|
@ -331,8 +331,8 @@ class AsyncNonMetaRegionLocator {
|
|||
byte[] endKey = loc.getRegionInfo().getEndKey();
|
||||
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
|
||||
+ Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
|
||||
Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
|
||||
}
|
||||
return loc;
|
||||
} else {
|
||||
|
@ -348,11 +348,11 @@ class AsyncNonMetaRegionLocator {
|
|||
return null;
|
||||
}
|
||||
HRegionLocation loc = entry.getValue();
|
||||
if (isEmptyStopRow(loc.getRegionInfo().getEndKey())
|
||||
|| Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) {
|
||||
if (isEmptyStopRow(loc.getRegionInfo().getEndKey()) ||
|
||||
Bytes.compareTo(loc.getRegionInfo().getEndKey(), row) >= 0) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
|
||||
+ Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
|
||||
Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
|
||||
}
|
||||
return loc;
|
||||
} else {
|
||||
|
@ -362,8 +362,8 @@ class AsyncNonMetaRegionLocator {
|
|||
|
||||
private void locateInMeta(TableName tableName, LocateRequest req) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
|
||||
+ "', locateType=" + req.locateType + " in meta");
|
||||
LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
|
||||
"', locateType=" + req.locateType + " in meta");
|
||||
}
|
||||
byte[] metaKey;
|
||||
if (req.locateType.equals(RegionLocateType.BEFORE)) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
|
@ -27,6 +28,8 @@ import static org.junit.Assert.assertThat;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.IntStream;
|
||||
|
@ -166,10 +169,7 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
return endKeys;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiRegionTable() throws IOException, InterruptedException {
|
||||
createMultiRegionTable();
|
||||
byte[][] startKeys = getStartKeys();
|
||||
private ServerName[] getLocations(byte[][] startKeys) {
|
||||
ServerName[] serverNames = new ServerName[startKeys.length];
|
||||
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
|
||||
.forEach(rs -> {
|
||||
|
@ -178,6 +178,14 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
Bytes::compareTo)] = rs.getServerName();
|
||||
});
|
||||
});
|
||||
return serverNames;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiRegionTable() throws IOException, InterruptedException {
|
||||
createMultiRegionTable();
|
||||
byte[][] startKeys = getStartKeys();
|
||||
ServerName[] serverNames = getLocations(startKeys);
|
||||
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
|
||||
try {
|
||||
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
|
||||
|
@ -264,4 +272,24 @@ public class TestAsyncNonMetaRegionLocator {
|
|||
|
||||
assertSame(afterLoc, LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER).get());
|
||||
}
|
||||
|
||||
// For HBASE-17402
|
||||
@Test
|
||||
public void testConcurrentLocate() throws IOException, InterruptedException, ExecutionException {
|
||||
createMultiRegionTable();
|
||||
byte[][] startKeys = getStartKeys();
|
||||
byte[][] endKeys = getEndKeys();
|
||||
ServerName[] serverNames = getLocations(startKeys);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
LOCATOR.clearCache(TABLE_NAME);
|
||||
List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000)
|
||||
.mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
|
||||
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT))
|
||||
.collect(toList());
|
||||
for (int j = 0; j < 1000; j++) {
|
||||
int index = Math.min(8, j / 111);
|
||||
assertLocEquals(startKeys[index], endKeys[index], serverNames[index], futures.get(j).get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue