HBASE-16945 Implement AsyncRegionLocator
This commit is contained in:
parent
45a2594249
commit
6cf9333e97
|
@ -66,7 +66,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
private final User user;
|
||||
|
||||
private final AsyncRegistry registry;
|
||||
final AsyncRegistry registry;
|
||||
|
||||
private final String clusterId;
|
||||
|
||||
|
@ -87,15 +87,11 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public AsyncConnectionImpl(Configuration conf, User user) throws IOException {
|
||||
public AsyncConnectionImpl(Configuration conf, User user) {
|
||||
this.conf = conf;
|
||||
this.user = user;
|
||||
|
||||
this.connConf = new AsyncConnectionConfiguration(conf);
|
||||
|
||||
this.locator = new AsyncRegionLocator(conf);
|
||||
|
||||
// action below will not throw exception so no need to catch and close.
|
||||
this.locator = new AsyncRegionLocator(this);
|
||||
this.registry = ClusterRegistryFactory.getRegistry(conf);
|
||||
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -122,7 +118,6 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeQuietly(locator);
|
||||
IOUtils.closeQuietly(rpcClient);
|
||||
IOUtils.closeQuietly(registry);
|
||||
}
|
||||
|
|
|
@ -17,71 +17,437 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
|
||||
import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
|
||||
import static org.apache.hadoop.hbase.HConstants.NINES;
|
||||
import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
||||
import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
|
||||
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
|
||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* TODO: reimplement using aync connection when the scan logic is ready. The current implementation
|
||||
* is based on the blocking client.
|
||||
* The asynchronous region locator.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class AsyncRegionLocator implements Closeable {
|
||||
class AsyncRegionLocator {
|
||||
|
||||
private final ConnectionImplementation conn;
|
||||
private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class);
|
||||
|
||||
AsyncRegionLocator(Configuration conf) throws IOException {
|
||||
conn = (ConnectionImplementation) ConnectionFactory.createConnection(conf);
|
||||
private final AsyncConnectionImpl conn;
|
||||
|
||||
private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
|
||||
|
||||
private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
|
||||
new AtomicReference<>();
|
||||
|
||||
private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
AsyncRegionLocator(AsyncConnectionImpl conn) {
|
||||
this.conn = conn;
|
||||
}
|
||||
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
boolean reload) {
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
try {
|
||||
future.complete(conn.getRegionLocation(tableName, row, reload));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion, boolean reload) {
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion);
|
||||
try {
|
||||
for (;;) {
|
||||
HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload);
|
||||
byte[] endKey = loc.getRegionInfo().getEndKey();
|
||||
if (Bytes.equals(startRowOfCurrentRegion, endKey)) {
|
||||
future.complete(loc);
|
||||
break;
|
||||
}
|
||||
toLocateRow = endKey;
|
||||
private CompletableFuture<HRegionLocation> locateMetaRegion() {
|
||||
for (;;) {
|
||||
HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
|
||||
if (metaRegionLocation != null) {
|
||||
return CompletableFuture.completedFuture(metaRegionLocation);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Meta region location cache is null, try fetching from registry.");
|
||||
}
|
||||
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Start fetching meta region location from registry.");
|
||||
}
|
||||
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
|
||||
conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> {
|
||||
if (error != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Failed to fetch meta region location from registry", error);
|
||||
}
|
||||
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The fetched meta region location is " + loc);
|
||||
}
|
||||
// Here we update cache before reset future, so it is possible that someone can get a
|
||||
// stale value. Consider this:
|
||||
// 1. update cache
|
||||
// 2. someone clear the cache and relocate again
|
||||
// 3. the metaRelocateFuture is not null so the old future is used.
|
||||
// 4. we clear metaRelocateFuture and complete the future in it with the value being
|
||||
// cleared in step 2.
|
||||
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
|
||||
// caller will retry again later, no correctness problems.
|
||||
this.metaRegionLocation.set(loc);
|
||||
metaRelocateFuture.set(null);
|
||||
future.complete(loc);
|
||||
});
|
||||
} else {
|
||||
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
|
||||
if (future != null) {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static ConcurrentNavigableMap<byte[], HRegionLocation> createTableCache() {
|
||||
return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
|
||||
}
|
||||
|
||||
private void removeFromCache(HRegionLocation loc) {
|
||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
|
||||
cache.get(loc.getRegionInfo().getTable());
|
||||
if (tableCache == null) {
|
||||
return;
|
||||
}
|
||||
tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
|
||||
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
||||
|| !oldLoc.getServerName().equals(loc.getServerName())) {
|
||||
return oldLoc;
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private void addToCache(HRegionLocation loc) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Try adding " + loc + " to cache");
|
||||
}
|
||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = computeIfAbsent(cache,
|
||||
loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache);
|
||||
byte[] startKey = loc.getRegionInfo().getStartKey();
|
||||
HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc);
|
||||
if (oldLoc == null) {
|
||||
return;
|
||||
}
|
||||
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
||||
|| oldLoc.getServerName().equals(loc.getServerName())) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
|
||||
+ " is newer than us or has the same server name");
|
||||
}
|
||||
return;
|
||||
}
|
||||
tableCache.compute(startKey, (k, oldValue) -> {
|
||||
if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
|
||||
return loc;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
|
||||
+ " is newer than us or has the same server name."
|
||||
+ " Maybe it is updated before we replace it");
|
||||
}
|
||||
return oldValue;
|
||||
});
|
||||
}
|
||||
|
||||
private HRegionLocation locateInCache(TableName tableName, byte[] row) {
|
||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
|
||||
if (tableCache == null) {
|
||||
return null;
|
||||
}
|
||||
Map.Entry<byte[], HRegionLocation> entry = tableCache.floorEntry(row);
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
HRegionLocation loc = entry.getValue();
|
||||
byte[] endKey = loc.getRegionInfo().getEndKey();
|
||||
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
|
||||
return loc;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void onScanComplete(CompletableFuture<HRegionLocation> future, TableName tableName,
|
||||
byte[] row, List<Result> results, Throwable error, String rowNameInErrorMsg,
|
||||
Consumer<HRegionLocation> otherCheck) {
|
||||
if (error != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='"
|
||||
+ Bytes.toStringBinary(row) + "'",
|
||||
error);
|
||||
}
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
if (results.isEmpty()) {
|
||||
future.completeExceptionally(new TableNotFoundException(tableName));
|
||||
return;
|
||||
}
|
||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
|
||||
+ Bytes.toStringBinary(row) + "' is " + locs);
|
||||
}
|
||||
if (locs == null || locs.getDefaultRegionLocation() == null) {
|
||||
future.completeExceptionally(
|
||||
new IOException(String.format("No location found for '%s', %s='%s'", tableName,
|
||||
rowNameInErrorMsg, Bytes.toStringBinary(row))));
|
||||
return;
|
||||
}
|
||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||
HRegionInfo info = loc.getRegionInfo();
|
||||
if (info == null) {
|
||||
future.completeExceptionally(
|
||||
new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
|
||||
rowNameInErrorMsg, Bytes.toStringBinary(row))));
|
||||
return;
|
||||
}
|
||||
if (!info.getTable().equals(tableName)) {
|
||||
future.completeExceptionally(new TableNotFoundException(
|
||||
"Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
|
||||
return;
|
||||
}
|
||||
if (info.isSplit()) {
|
||||
future.completeExceptionally(new RegionOfflineException(
|
||||
"the only available region for the required row is a split parent,"
|
||||
+ " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
}
|
||||
if (info.isOffline()) {
|
||||
future.completeExceptionally(new RegionOfflineException("the region is offline, could"
|
||||
+ " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
|
||||
return;
|
||||
}
|
||||
if (loc.getServerName() == null) {
|
||||
future.completeExceptionally(new NoServerForRegionException(
|
||||
String.format("No server address listed for region '%s', %s='%s'",
|
||||
info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row))));
|
||||
return;
|
||||
}
|
||||
otherCheck.accept(loc);
|
||||
if (future.isDone()) {
|
||||
return;
|
||||
}
|
||||
addToCache(loc);
|
||||
future.complete(loc);
|
||||
}
|
||||
|
||||
private CompletableFuture<HRegionLocation> locateInMeta(TableName tableName, byte[] row) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta");
|
||||
}
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
byte[] metaKey = createRegionName(tableName, row, NINES, false);
|
||||
conn.getTable(META_TABLE_NAME)
|
||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||
.whenComplete(
|
||||
(results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
|
||||
}));
|
||||
return future;
|
||||
}
|
||||
|
||||
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
|
||||
ServerName source) {
|
||||
conn.updateCachedLocations(tableName, regionName, row, exception, source);
|
||||
private CompletableFuture<HRegionLocation> locateRegion(TableName tableName, byte[] row) {
|
||||
HRegionLocation loc = locateInCache(tableName, row);
|
||||
if (loc != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
|
||||
+ Bytes.toStringBinary(row) + "'");
|
||||
}
|
||||
return CompletableFuture.completedFuture(loc);
|
||||
}
|
||||
return locateInMeta(tableName, row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeQuietly(conn);
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
|
||||
if (tableName.equals(META_TABLE_NAME)) {
|
||||
return locateMetaRegion();
|
||||
} else {
|
||||
return locateRegion(tableName, row);
|
||||
}
|
||||
}
|
||||
|
||||
private HRegionLocation locatePreviousInCache(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion) {
|
||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
|
||||
if (tableCache == null) {
|
||||
return null;
|
||||
}
|
||||
Map.Entry<byte[], HRegionLocation> entry;
|
||||
if (isEmptyStopRow(startRowOfCurrentRegion)) {
|
||||
entry = tableCache.lastEntry();
|
||||
} else {
|
||||
entry = tableCache.lowerEntry(startRowOfCurrentRegion);
|
||||
}
|
||||
if (entry == null) {
|
||||
return null;
|
||||
}
|
||||
HRegionLocation loc = entry.getValue();
|
||||
if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
|
||||
return loc;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private CompletableFuture<HRegionLocation> locatePreviousInMeta(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
|
||||
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta");
|
||||
}
|
||||
byte[] metaKey;
|
||||
if (isEmptyStopRow(startRowOfCurrentRegion)) {
|
||||
byte[] binaryTableName = tableName.getName();
|
||||
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
|
||||
} else {
|
||||
metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
|
||||
}
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
conn.getTable(META_TABLE_NAME)
|
||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||
.whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
|
||||
results, error, "startRowOfCurrentRegion", loc -> {
|
||||
HRegionInfo info = loc.getRegionInfo();
|
||||
if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) {
|
||||
future.completeExceptionally(new IOException("The end key of '"
|
||||
+ info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey())
|
||||
+ "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"));
|
||||
}
|
||||
}));
|
||||
return future;
|
||||
}
|
||||
|
||||
private CompletableFuture<HRegionLocation> locatePreviousRegion(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion) {
|
||||
HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion);
|
||||
if (loc != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
|
||||
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
|
||||
}
|
||||
return CompletableFuture.completedFuture(loc);
|
||||
}
|
||||
return locatePreviousInMeta(tableName, startRowOfCurrentRegion);
|
||||
}
|
||||
|
||||
/**
|
||||
* Locate the previous region using the current regions start key. Used for reverse scan.
|
||||
*/
|
||||
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion) {
|
||||
if (tableName.equals(META_TABLE_NAME)) {
|
||||
return locateMetaRegion();
|
||||
} else {
|
||||
return locatePreviousRegion(tableName, startRowOfCurrentRegion);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
|
||||
// Do not need to update if no such location, or the location is newer, or the location is not
|
||||
// same with us
|
||||
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
|
||||
&& oldLoc.getServerName().equals(loc.getServerName());
|
||||
}
|
||||
|
||||
private void updateCachedLoation(HRegionLocation loc, Throwable exception,
|
||||
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
|
||||
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
|
||||
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
|
||||
}
|
||||
if (!canUpdate(loc, oldLoc)) {
|
||||
return;
|
||||
}
|
||||
Throwable cause = findException(exception);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The actual exception when updating " + loc, cause);
|
||||
}
|
||||
if (cause == null || !isMetaClearingException(cause)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Will not update " + loc + " because the exception is null or not the one we care about");
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (cause instanceof RegionMovedException) {
|
||||
RegionMovedException rme = (RegionMovedException) cause;
|
||||
HRegionLocation newLoc =
|
||||
new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
|
||||
}
|
||||
addToCache.accept(newLoc);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Try removing " + loc + " from cache");
|
||||
}
|
||||
removeFromCache.accept(loc);
|
||||
}
|
||||
}
|
||||
|
||||
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
||||
if (loc.getRegionInfo().isMetaTable()) {
|
||||
updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
|
||||
for (;;) {
|
||||
HRegionLocation oldLoc = metaRegionLocation.get();
|
||||
if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
|
||||
|| oldLoc.getServerName().equals(newLoc.getServerName()))) {
|
||||
return;
|
||||
}
|
||||
if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}, l -> {
|
||||
for (;;) {
|
||||
HRegionLocation oldLoc = metaRegionLocation.get();
|
||||
if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
updateCachedLoation(loc, exception, l -> {
|
||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
|
||||
cache.get(l.getRegionInfo().getTable());
|
||||
if (tableCache == null) {
|
||||
return null;
|
||||
}
|
||||
return tableCache.get(l.getRegionInfo().getStartKey());
|
||||
}, this::addToCache, this::removeFromCache);
|
||||
}
|
||||
}
|
||||
|
||||
void clearCache(TableName tableName) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Clear meta cache for " + tableName);
|
||||
}
|
||||
cache.remove(tableName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,8 +68,8 @@ class AsyncRpcRetryingCallerFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
public SingleRequestCallerBuilder<T>
|
||||
action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
|
||||
public SingleRequestCallerBuilder<T> action(
|
||||
AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
|
||||
this.callable = callable;
|
||||
return this;
|
||||
}
|
||||
|
@ -92,12 +92,9 @@ class AsyncRpcRetryingCallerFactory {
|
|||
public AsyncSingleRequestRpcRetryingCaller<T> build() {
|
||||
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
|
||||
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
|
||||
locateToPreviousRegion
|
||||
? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re)
|
||||
: (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re),
|
||||
checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
|
||||
conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
|
||||
conn.connConf.getStartLogErrorsCnt());
|
||||
locateToPreviousRegion, checkNotNull(callable, "action is null"),
|
||||
conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
|
||||
rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -60,12 +60,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
ClientService.Interface stub);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface RegionLocator {
|
||||
CompletableFuture<HRegionLocation> locate(AsyncConnectionImpl conn, TableName tableName,
|
||||
byte[] row, boolean reload);
|
||||
}
|
||||
|
||||
private final HashedWheelTimer retryTimer;
|
||||
|
||||
private final AsyncConnectionImpl conn;
|
||||
|
@ -74,7 +68,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
|
||||
private final byte[] row;
|
||||
|
||||
private final RegionLocator locator;
|
||||
private final Supplier<CompletableFuture<HRegionLocation>> locate;
|
||||
|
||||
private final Callable<T> callable;
|
||||
|
||||
|
@ -97,13 +91,18 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
private final long startNs;
|
||||
|
||||
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
|
||||
TableName tableName, byte[] row, RegionLocator locator, Callable<T> callable, long pauseNs,
|
||||
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
TableName tableName, byte[] row, boolean locateToPreviousRegion, Callable<T> callable,
|
||||
long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs,
|
||||
int startLogErrorsCnt) {
|
||||
this.retryTimer = retryTimer;
|
||||
this.conn = conn;
|
||||
this.tableName = tableName;
|
||||
this.row = row;
|
||||
this.locator = locator;
|
||||
if (locateToPreviousRegion) {
|
||||
this.locate = this::locatePrevious;
|
||||
} else {
|
||||
this.locate = this::locate;
|
||||
}
|
||||
this.callable = callable;
|
||||
this.pauseNs = pauseNs;
|
||||
this.maxAttempts = retries2Attempts(maxRetries);
|
||||
|
@ -145,8 +144,9 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
if (tries > startLogErrorsCnt) {
|
||||
LOG.warn(errMsg.get(), error);
|
||||
}
|
||||
RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(
|
||||
error, EnvironmentEdgeManager.currentTime(), "");
|
||||
RetriesExhaustedException.ThrowableWithExtraContext qt =
|
||||
new RetriesExhaustedException.ThrowableWithExtraContext(error,
|
||||
EnvironmentEdgeManager.currentTime(), "");
|
||||
exceptions.add(qt);
|
||||
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
|
||||
completeExceptionally();
|
||||
|
@ -194,8 +194,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
+ " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
|
||||
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
|
||||
+ elapsedMs() + " ms",
|
||||
err -> conn.getLocator().updateCachedLocations(tableName,
|
||||
loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
|
||||
err -> conn.getLocator().updateCachedLocation(loc, err));
|
||||
return;
|
||||
}
|
||||
resetController();
|
||||
|
@ -207,8 +206,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
+ tries + ", maxAttempts = " + maxAttempts + ", timeout = "
|
||||
+ TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = "
|
||||
+ elapsedMs() + " ms",
|
||||
err -> conn.getLocator().updateCachedLocations(tableName,
|
||||
loc.getRegionInfo().getRegionName(), row, err, loc.getServerName()));
|
||||
err -> conn.getLocator().updateCachedLocation(loc, err));
|
||||
return;
|
||||
}
|
||||
future.complete(result);
|
||||
|
@ -216,7 +214,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
}
|
||||
|
||||
private void locateThenCall() {
|
||||
locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> {
|
||||
locate.get().whenComplete((loc, error) -> {
|
||||
if (error != null) {
|
||||
onError(error,
|
||||
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "
|
||||
|
@ -231,6 +229,14 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
});
|
||||
}
|
||||
|
||||
private CompletableFuture<HRegionLocation> locate() {
|
||||
return conn.getLocator().getRegionLocation(tableName, row);
|
||||
}
|
||||
|
||||
private CompletableFuture<HRegionLocation> locatePrevious() {
|
||||
return conn.getLocator().getPreviousRegionLocation(tableName, row);
|
||||
}
|
||||
|
||||
public CompletableFuture<T> call() {
|
||||
locateThenCall();
|
||||
return future;
|
||||
|
|
|
@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
|
|||
|
||||
@Override
|
||||
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
|
||||
return locator.getRegionLocation(tableName, row, reload);
|
||||
return locator.getRegionLocation(tableName, row);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
|
@ -107,6 +108,19 @@ public class CollectionUtils {
|
|||
return list.get(list.size() - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the
|
||||
* value already exists. So here we copy the implementation of
|
||||
* {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and
|
||||
* putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee
|
||||
* that the supplier will only be executed once.
|
||||
*/
|
||||
public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier) {
|
||||
V v, newValue;
|
||||
return ((v = map.get(key)) == null && (newValue = supplier.get()) != null
|
||||
&& (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
|
||||
}
|
||||
|
||||
/**
|
||||
* A supplier that throws IOException when get.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.ByteBufferPool;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Will split the table, and move region randomly when testing.
|
||||
*/
|
||||
@Category({ LargeTests.class, ClientTests.class })
|
||||
public class TestAsyncGetMultiThread {
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private static byte[] QUALIFIER = Bytes.toBytes("cq");
|
||||
|
||||
private static int COUNT = 1000;
|
||||
|
||||
private static AsyncConnection CONN;
|
||||
|
||||
private static byte[][] SPLIT_KEYS;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
|
||||
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
|
||||
TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
|
||||
TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
|
||||
TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
|
||||
TEST_UTIL.startMiniCluster(5);
|
||||
SPLIT_KEYS = new byte[8][];
|
||||
for (int i = 111; i < 999; i += 111) {
|
||||
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
||||
}
|
||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
|
||||
AsyncTable table = CONN.getTable(TABLE_NAME);
|
||||
List<CompletableFuture<?>> futures = new ArrayList<>();
|
||||
IntStream.range(0, COUNT)
|
||||
.forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
|
||||
.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
CONN.close();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
|
||||
while (!stop.get()) {
|
||||
int i = ThreadLocalRandom.current().nextInt(COUNT);
|
||||
assertEquals(i,
|
||||
Bytes.toInt(CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
|
||||
.get().getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException, InterruptedException, ExecutionException {
|
||||
int numThreads = 20;
|
||||
AtomicBoolean stop = new AtomicBoolean(false);
|
||||
ExecutorService executor =
|
||||
Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
|
||||
List<Future<?>> futures = new ArrayList<>();
|
||||
IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
|
||||
run(stop);
|
||||
return null;
|
||||
})));
|
||||
Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
for (byte[] splitPoint : SPLIT_KEYS) {
|
||||
admin.split(TABLE_NAME, splitPoint);
|
||||
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
|
||||
region.compact(true);
|
||||
}
|
||||
Thread.sleep(5000);
|
||||
admin.balancer(true);
|
||||
Thread.sleep(5000);
|
||||
ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
|
||||
ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||
.map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
|
||||
.findAny().get();
|
||||
admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(newMetaServer.getServerName()));
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
stop.set(true);
|
||||
executor.shutdown();
|
||||
for (Future<?> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,239 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncRegionLocator {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private static AsyncConnectionImpl CONN;
|
||||
|
||||
private static AsyncRegionLocator LOCATOR;
|
||||
|
||||
private static byte[][] SPLIT_KEYS;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
|
||||
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
|
||||
LOCATOR = CONN.getLocator();
|
||||
SPLIT_KEYS = new byte[8][];
|
||||
for (int i = 111; i < 999; i += 111) {
|
||||
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
CONN.close();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownAfterTest() throws IOException {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
if (admin.tableExists(TABLE_NAME)) {
|
||||
if (admin.isTableEnabled(TABLE_NAME)) {
|
||||
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
|
||||
}
|
||||
TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
|
||||
}
|
||||
LOCATOR.clearCache(TABLE_NAME);
|
||||
}
|
||||
|
||||
private void createSingleRegionTable() throws IOException, InterruptedException {
|
||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoTable() throws InterruptedException {
|
||||
try {
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
try {
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableTable() throws IOException, InterruptedException {
|
||||
createSingleRegionTable();
|
||||
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
|
||||
try {
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
try {
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
|
||||
HRegionLocation loc) {
|
||||
HRegionInfo info = loc.getRegionInfo();
|
||||
assertEquals(TABLE_NAME, info.getTable());
|
||||
assertArrayEquals(startKey, info.getStartKey());
|
||||
assertArrayEquals(endKey, info.getEndKey());
|
||||
assertEquals(serverName, loc.getServerName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleRegionTable() throws IOException, InterruptedException, ExecutionException {
|
||||
createSingleRegionTable();
|
||||
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
|
||||
ThreadLocalRandom.current().nextBytes(randKey);
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
|
||||
// Use a key which is not the endKey of a region will cause error
|
||||
try {
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(IOException.class));
|
||||
assertTrue(e.getCause().getMessage().contains("end key of"));
|
||||
}
|
||||
}
|
||||
|
||||
private void createMultiRegionTable() throws IOException, InterruptedException {
|
||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
}
|
||||
|
||||
private static byte[][] getStartKeys() {
|
||||
byte[][] startKeys = new byte[SPLIT_KEYS.length + 1][];
|
||||
startKeys[0] = EMPTY_START_ROW;
|
||||
System.arraycopy(SPLIT_KEYS, 0, startKeys, 1, SPLIT_KEYS.length);
|
||||
return startKeys;
|
||||
}
|
||||
|
||||
private static byte[][] getEndKeys() {
|
||||
byte[][] endKeys = Arrays.copyOf(SPLIT_KEYS, SPLIT_KEYS.length + 1);
|
||||
endKeys[endKeys.length - 1] = EMPTY_START_ROW;
|
||||
return endKeys;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiRegionTable() throws IOException, InterruptedException {
|
||||
createMultiRegionTable();
|
||||
byte[][] startKeys = getStartKeys();
|
||||
ServerName[] serverNames = new ServerName[startKeys.length];
|
||||
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
|
||||
.forEach(rs -> {
|
||||
rs.getOnlineRegions(TABLE_NAME).forEach(r -> {
|
||||
serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
|
||||
Bytes::compareTo)] = rs.getServerName();
|
||||
});
|
||||
});
|
||||
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
|
||||
try {
|
||||
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
|
||||
serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
LOCATOR.clearCache(TABLE_NAME);
|
||||
byte[][] endKeys = getEndKeys();
|
||||
IntStream.range(0, 2).forEach(
|
||||
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
|
||||
try {
|
||||
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
|
||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
|
||||
createSingleRegionTable();
|
||||
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
||||
HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
|
||||
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
|
||||
.findAny().get();
|
||||
|
||||
TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegionInfo().getEncodedName()),
|
||||
Bytes.toBytes(newServerName.getServerName()));
|
||||
while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
|
||||
.equals(newServerName)) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
// Should be same as it is in cache
|
||||
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
LOCATOR.updateCachedLocation(loc, null);
|
||||
// null error will not trigger a cache cleanup
|
||||
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
|
||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
|
||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||
}
|
||||
}
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
|
@ -151,11 +150,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
|
|||
AtomicBoolean errorTriggered = new AtomicBoolean(false);
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
|
||||
|
||||
try (AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn.getConfiguration()) {
|
||||
AsyncRegionLocator mockedLocator = new AsyncRegionLocator(asyncConn) {
|
||||
@Override
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||
boolean reload) {
|
||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
|
||||
if (tableName.equals(TABLE_NAME)) {
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
if (count.getAndIncrement() == 0) {
|
||||
|
@ -166,17 +163,22 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
|
|||
}
|
||||
return future;
|
||||
} else {
|
||||
return super.getRegionLocation(tableName, row, reload);
|
||||
return super.getRegionLocation(tableName, row);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row,
|
||||
Object exception, ServerName source) {
|
||||
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||
byte[] startRowOfCurrentRegion) {
|
||||
return super.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
||||
}
|
||||
};
|
||||
AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(asyncConn.getConfiguration(),
|
||||
User.getCurrent()) {
|
||||
try (AsyncConnectionImpl mockedConn =
|
||||
new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
|
||||
|
||||
@Override
|
||||
AsyncRegionLocator getLocator() {
|
||||
|
|
Loading…
Reference in New Issue