HBASE-17356 Add replica get support

This commit is contained in:
zhangduo 2019-01-01 21:59:37 +08:00
parent 2d8d74c64d
commit 77ca660389
25 changed files with 2368 additions and 1636 deletions

View File

@ -56,8 +56,8 @@ public class RegionLocations {
int index = 0;
for (HRegionLocation loc : locations) {
if (loc != null) {
if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) {
maxReplicaId = loc.getRegionInfo().getReplicaId();
if (loc.getRegion().getReplicaId() >= maxReplicaId) {
maxReplicaId = loc.getRegion().getReplicaId();
maxReplicaIdIndex = index;
}
}
@ -72,7 +72,7 @@ public class RegionLocations {
this.locations = new HRegionLocation[maxReplicaId + 1];
for (HRegionLocation loc : locations) {
if (loc != null) {
this.locations[loc.getRegionInfo().getReplicaId()] = loc;
this.locations[loc.getRegion().getReplicaId()] = loc;
}
}
}
@ -146,7 +146,7 @@ public class RegionLocations {
public RegionLocations remove(HRegionLocation location) {
if (location == null) return this;
if (location.getRegion() == null) return this;
int replicaId = location.getRegionInfo().getReplicaId();
int replicaId = location.getRegion().getReplicaId();
if (replicaId >= locations.length) return this;
// check whether something to remove. HRL.compareTo() compares ONLY the
@ -203,14 +203,14 @@ public class RegionLocations {
// in case of region replication going down, we might have a leak here.
int max = other.locations.length;
HRegionInfo regionInfo = null;
RegionInfo regionInfo = null;
for (int i = 0; i < max; i++) {
HRegionLocation thisLoc = this.getRegionLocation(i);
HRegionLocation otherLoc = other.getRegionLocation(i);
if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) {
if (regionInfo == null && otherLoc != null && otherLoc.getRegion() != null) {
// regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that
// all replica region infos belong to the same region with same region id.
regionInfo = otherLoc.getRegionInfo();
regionInfo = otherLoc.getRegion();
}
HRegionLocation selectedLoc = selectRegionLocation(thisLoc,
@ -232,7 +232,7 @@ public class RegionLocations {
for (int i=0; i < newLocations.length; i++) {
if (newLocations[i] != null) {
if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo,
newLocations[i].getRegionInfo())) {
newLocations[i].getRegion())) {
newLocations[i] = null;
}
}
@ -273,9 +273,9 @@ public class RegionLocations {
boolean checkForEquals, boolean force) {
assert location != null;
int replicaId = location.getRegionInfo().getReplicaId();
int replicaId = location.getRegion().getReplicaId();
HRegionLocation oldLoc = getRegionLocation(location.getRegionInfo().getReplicaId());
HRegionLocation oldLoc = getRegionLocation(location.getRegion().getReplicaId());
HRegionLocation selectedLoc = selectRegionLocation(oldLoc, location,
checkForEquals, force);
@ -288,8 +288,8 @@ public class RegionLocations {
// ensure that all replicas share the same start code. Otherwise delete them
for (int i=0; i < newLocations.length; i++) {
if (newLocations[i] != null) {
if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(),
newLocations[i].getRegionInfo())) {
if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegion(),
newLocations[i].getRegion())) {
newLocations[i] = null;
}
}
@ -317,8 +317,8 @@ public class RegionLocations {
public HRegionLocation getRegionLocationByRegionName(byte[] regionName) {
for (HRegionLocation loc : locations) {
if (loc != null) {
if (Bytes.equals(loc.getRegionInfo().getRegionName(), regionName)
|| Bytes.equals(loc.getRegionInfo().getEncodedNameAsBytes(), regionName)) {
if (Bytes.equals(loc.getRegion().getRegionName(), regionName)
|| Bytes.equals(loc.getRegion().getEncodedNameAsBytes(), regionName)) {
return loc;
}
}
@ -331,7 +331,7 @@ public class RegionLocations {
}
public HRegionLocation getDefaultRegionLocation() {
return locations[HRegionInfo.DEFAULT_REPLICA_ID];
return locations[RegionInfo.DEFAULT_REPLICA_ID];
}
/**

View File

@ -23,8 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.ArrayList;
@ -43,24 +42,26 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Retry caller for batch.
@ -121,10 +122,10 @@ class AsyncBatchRpcRetryingCaller<T> {
private static final class ServerRequest {
public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
public void addAction(HRegionLocation loc, Action action) {
computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
() -> new RegionRequest(loc)).actions.add(action);
}
}
@ -173,11 +174,10 @@ class AsyncBatchRpcRetryingCaller<T> {
Throwable error, ServerName serverName) {
if (tries > startLogErrorsCnt) {
String regions =
regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
.collect(Collectors.joining(",", "[", "]"));
LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
+ " failed, tries=" + tries,
error);
regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
.collect(Collectors.joining(",", "[", "]"));
LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
" failed, tries=" + tries, error);
}
}
@ -191,7 +191,7 @@ class AsyncBatchRpcRetryingCaller<T> {
errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
}
errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
getExtraContextForError(serverName)));
getExtraContextForError(serverName)));
}
private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
@ -204,7 +204,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
ThrowableWithExtraContext errorWithCtx =
new ThrowableWithExtraContext(error, currentTime, extras);
new ThrowableWithExtraContext(error, currentTime, extras);
List<ThrowableWithExtraContext> errors = removeErrors(action);
if (errors == null) {
errors = Collections.singletonList(errorWithCtx);
@ -227,7 +227,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
future.completeExceptionally(new RetriesExhaustedException(tries,
Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
});
}
@ -242,9 +242,9 @@ class AsyncBatchRpcRetryingCaller<T> {
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
RequestConverter.buildNoDataRegionActions(entry.getKey(),
entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
mutationBuilder, nonceGroup, rowMutationsIndexMap);
RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
rowMutationsIndexMap);
}
return multiRequestBuilder.build();
}
@ -254,15 +254,15 @@ class AsyncBatchRpcRetryingCaller<T> {
RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
if (result == null) {
LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
+ Bytes.toStringBinary(action.getAction().getRow()) + "' of "
+ regionReq.loc.getRegionInfo().getRegionNameAsString());
LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
regionReq.loc.getRegion().getRegionNameAsString());
addError(action, new RuntimeException("Invalid response"), serverName);
failedActions.add(action);
} else if (result instanceof Throwable) {
Throwable error = translateException((Throwable) result);
logException(tries, () -> Stream.of(regionReq), error, serverName);
conn.getLocator().updateCachedLocation(regionReq.loc, error);
conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
getExtraContextForError(serverName));
@ -281,20 +281,19 @@ class AsyncBatchRpcRetryingCaller<T> {
RegionResult regionResult = resp.getResults().get(rn);
Throwable regionException = resp.getException(rn);
if (regionResult != null) {
regionReq.actions.forEach(
action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
regionException));
regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
regionResult, failedActions, regionException));
} else {
Throwable error;
if (regionException == null) {
LOG.error(
"Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
LOG
.error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
error = new RuntimeException("Invalid response");
} else {
error = translateException(regionException);
}
logException(tries, () -> Stream.of(regionReq), error, serverName);
conn.getLocator().updateCachedLocation(regionReq.loc, error);
conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(regionReq.actions.stream(), tries, error, serverName);
return;
@ -314,8 +313,7 @@ class AsyncBatchRpcRetryingCaller<T> {
remainingNs = remainingTimeNs();
if (remainingNs <= 0) {
failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
.flatMap(r -> r.actions.stream()),
tries);
.flatMap(r -> r.actions.stream()), tries);
return;
}
} else {
@ -366,15 +364,15 @@ class AsyncBatchRpcRetryingCaller<T> {
ServerName serverName) {
Throwable error = translateException(t);
logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
actionsByRegion
.forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));
actionsByRegion.forEach(
(rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
serverName);
return;
}
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
.collect(Collectors.toList());
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries);
}
@ -407,30 +405,30 @@ class AsyncBatchRpcRetryingCaller<T> {
}
ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
CompletableFuture.allOf(actions
.map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
if (error != null) {
error = translateException(error);
if (error instanceof DoNotRetryIOException) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
return;
}
addError(action, error, null);
locateFailed.add(action);
} else {
computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
.addAction(loc, action);
addListener(CompletableFuture.allOf(actions
.map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
if (error != null) {
error = translateException(error);
if (error instanceof DoNotRetryIOException) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
return;
}
}))
.toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
if (!actionsByServer.isEmpty()) {
send(actionsByServer, tries);
addError(action, error, null);
locateFailed.add(action);
} else {
computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
action);
}
if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries);
}
});
}))
.toArray(CompletableFuture[]::new)), (v, r) -> {
if (!actionsByServer.isEmpty()) {
send(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries);
}
});
}
public List<CompletableFuture<T>> call() {

View File

@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@ -94,6 +96,10 @@ class AsyncConnectionConfiguration {
private final long writeBufferPeriodicFlushTimeoutNs;
// this is for supporting region replica get, if the primary does not finished within this
// timeout, we will send request to secondaries.
private final long primaryCallTimeoutNs;
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@ -124,6 +130,8 @@ class AsyncConnectionConfiguration {
this.writeBufferPeriodicFlushTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@ -181,4 +189,8 @@ class AsyncConnectionConfiguration {
long getWriteBufferPeriodicFlushTimeoutNs() {
return writeBufferPeriodicFlushTimeoutNs;
}
long getPrimaryCallTimeoutNs() {
return primaryCallTimeoutNs;
}
}

View File

@ -152,7 +152,6 @@ class AsyncConnectionImpl implements AsyncConnection {
}
// we will override this method for testing retry caller, so do not remove this method.
@VisibleForTesting
AsyncRegionLocator getLocator() {
return locator;
}

View File

@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,43 +41,43 @@ class AsyncMetaRegionLocator {
private final AsyncRegistry registry;
private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
new AtomicReference<>();
private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
new AtomicReference<>();
AsyncMetaRegionLocator(AsyncRegistry registry) {
this.registry = registry;
}
CompletableFuture<HRegionLocation> getRegionLocation(boolean reload) {
/**
* Get the region locations for meta region. If the location for the given replica is not
* available in the cached locations, then fetch from the HBase cluster.
* <p/>
* The <code>replicaId</code> parameter is important. If the region replication config for meta
* region is changed, then the cached region locations may not have the locations for new
* replicas. If we do not check the location for the given replica, we will always return the
* cached region locations and cause an infinite loop.
*/
CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
for (;;) {
if (!reload) {
HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
if (metaRegionLocation != null) {
return CompletableFuture.completedFuture(metaRegionLocation);
RegionLocations locs = this.metaRegionLocations.get();
if (isGood(locs, replicaId)) {
return CompletableFuture.completedFuture(locs);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Meta region location cache is null, try fetching from registry.");
}
LOG.trace("Meta region location cache is null, try fetching from registry.");
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start fetching meta region location from registry.");
}
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
LOG.debug("Start fetching meta region location from registry.");
CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
registry.getMetaRegionLocation().whenComplete((locs, error) -> {
if (error != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to fetch meta region location from registry", error);
}
LOG.debug("Failed to fetch meta region location from registry", error);
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
return;
}
HRegionLocation loc = locs.getDefaultRegionLocation();
if (LOG.isDebugEnabled()) {
LOG.debug("The fetched meta region location is " + loc);
}
LOG.debug("The fetched meta region location is {}" + locs);
// Here we update cache before reset future, so it is possible that someone can get a
// stale value. Consider this:
// 1. update cache
@ -82,12 +87,12 @@ class AsyncMetaRegionLocator {
// cleared in step 2.
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
// caller will retry again later, no correctness problems.
this.metaRegionLocation.set(loc);
this.metaRegionLocations.set(locs);
metaRelocateFuture.set(null);
future.complete(loc);
future.complete(locs);
});
} else {
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
if (future != null) {
return future;
}
@ -95,30 +100,56 @@ class AsyncMetaRegionLocator {
}
}
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
AsyncRegionLocator.updateCachedLocation(loc, exception, l -> metaRegionLocation.get(),
newLoc -> {
for (;;) {
HRegionLocation oldLoc = metaRegionLocation.get();
if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() ||
oldLoc.getServerName().equals(newLoc.getServerName()))) {
return;
}
if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
return;
}
private HRegionLocation getCacheLocation(HRegionLocation loc) {
RegionLocations locs = metaRegionLocations.get();
return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
}
private void addLocationToCache(HRegionLocation loc) {
for (;;) {
int replicaId = loc.getRegion().getReplicaId();
RegionLocations oldLocs = metaRegionLocations.get();
if (oldLocs == null) {
RegionLocations newLocs = createRegionLocations(loc);
if (metaRegionLocations.compareAndSet(null, newLocs)) {
return;
}
}, l -> {
for (;;) {
HRegionLocation oldLoc = metaRegionLocation.get();
if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
return;
}
}
});
}
HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() ||
oldLoc.getServerName().equals(loc.getServerName()))) {
return;
}
RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
return;
}
}
}
private void removeLocationFromCache(HRegionLocation loc) {
for (;;) {
RegionLocations oldLocs = metaRegionLocations.get();
if (oldLocs == null) {
return;
}
HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
if (!canUpdateOnError(loc, oldLoc)) {
return;
}
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
return;
}
}
}
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
this::addLocationToCache, this::removeLocationFromCache);
}
void clearCache() {
metaRegionLocation.set(null);
metaRegionLocations.set(null);
}
}

View File

@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.NINES;
import static org.apache.hadoop.hbase.HConstants.ZEROES;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
@ -39,7 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
@ -53,6 +60,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
/**
* The asynchronous locator for regions other than meta.
@ -83,9 +91,9 @@ class AsyncNonMetaRegionLocator {
private static final class LocateRequest {
public final byte[] row;
private final byte[] row;
public final RegionLocateType locateType;
private final RegionLocateType locateType;
public LocateRequest(byte[] row, RegionLocateType locateType) {
this.row = row;
@ -109,12 +117,12 @@ class AsyncNonMetaRegionLocator {
private static final class TableCache {
public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
public final Set<LocateRequest> pendingRequests = new HashSet<>();
private final Set<LocateRequest> pendingRequests = new HashSet<>();
public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
new LinkedHashMap<>();
public boolean hasQuota(int max) {
@ -133,25 +141,29 @@ class AsyncNonMetaRegionLocator {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}
public void clearCompletedRequests(Optional<HRegionLocation> location) {
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
public void clearCompletedRequests(Optional<RegionLocations> locations) {
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
if (tryComplete(entry.getKey(), entry.getValue(), location)) {
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
iter.remove();
}
}
}
private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
Optional<HRegionLocation> location) {
private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
Optional<RegionLocations> locations) {
if (future.isDone()) {
return true;
}
if (!location.isPresent()) {
if (!locations.isPresent()) {
return false;
}
HRegionLocation loc = location.get();
RegionLocations locs = locations.get();
HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
// we should at least have one location available, otherwise the request should fail and
// should not arrive here
assert loc != null;
boolean completed;
if (req.locateType.equals(RegionLocateType.BEFORE)) {
// for locating the row before current row, the common case is to find the previous region
@ -166,7 +178,7 @@ class AsyncNonMetaRegionLocator {
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
future.complete(loc);
future.complete(locs);
return true;
} else {
return false;
@ -186,59 +198,59 @@ class AsyncNonMetaRegionLocator {
return computeIfAbsent(cache, tableName, TableCache::new);
}
private void removeFromCache(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
return;
private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
HRegionLocation[] locArr1 = locs1.getRegionLocations();
HRegionLocation[] locArr2 = locs2.getRegionLocations();
if (locArr1.length != locArr2.length) {
return false;
}
tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> {
if (oldLoc.getSeqNum() > loc.getSeqNum() ||
!oldLoc.getServerName().equals(loc.getServerName())) {
return oldLoc;
for (int i = 0; i < locArr1.length; i++) {
// do not need to compare region info
HRegionLocation loc1 = locArr1[i];
HRegionLocation loc2 = locArr2[i];
if (loc1 == null) {
if (loc2 != null) {
return false;
}
} else {
if (loc2 == null) {
return false;
}
if (loc1.getSeqNum() != loc2.getSeqNum()) {
return false;
}
if (Objects.equal(loc1.getServerName(), loc2.getServerName())) {
return false;
}
}
return null;
});
}
return true;
}
// return whether we add this loc to cache
private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
if (LOG.isTraceEnabled()) {
LOG.trace("Try adding " + loc + " to cache");
}
byte[] startKey = loc.getRegion().getStartKey();
HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
if (oldLoc == null) {
return true;
}
if (oldLoc.getSeqNum() > loc.getSeqNum() ||
oldLoc.getServerName().equals(loc.getServerName())) {
if (LOG.isTraceEnabled()) {
LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc +
" is newer than us or has the same server name");
private boolean addToCache(TableCache tableCache, RegionLocations locs) {
LOG.trace("Try adding {} to cache", locs);
byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey();
for (;;) {
RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
if (oldLocs == null) {
return true;
}
return false;
}
return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
return loc;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue +
RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs);
if (isEqual(mergedLocs, oldLocs)) {
// the merged one is the same with the old one, give up
LOG.trace("Will not add {} to cache because the old value {} " +
" is newer than us or has the same server name." +
" Maybe it is updated before we replace it");
" Maybe it is updated before we replace it", locs, oldLocs);
return false;
}
return oldValue;
});
if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
return true;
}
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
justification = "Called by lambda expression")
private void addToCache(HRegionLocation loc) {
addToCache(getTableCache(loc.getRegion().getTable()), loc);
LOG.trace("Try adding {} to cache", loc);
}
private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
Throwable error) {
if (error != null) {
LOG.warn("Failed to locate region in '" + tableName + "', row='" +
@ -246,8 +258,8 @@ class AsyncNonMetaRegionLocator {
}
Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
if (loc != null) {
if (!addToCache(tableCache, loc)) {
if (locs != null) {
if (!addToCache(tableCache, locs)) {
// someone is ahead of us.
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
@ -269,7 +281,7 @@ class AsyncNonMetaRegionLocator {
future.completeExceptionally(error);
}
}
tableCache.clearCompletedRequests(Optional.ofNullable(loc));
tableCache.clearCompletedRequests(Optional.ofNullable(locs));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
@ -286,9 +298,11 @@ class AsyncNonMetaRegionLocator {
Bytes.toStringBinary(req.row), req.locateType, locs);
}
// the default region location should always be presented when fetching from meta, otherwise
// let's fail the request.
if (locs == null || locs.getDefaultRegionLocation() == null) {
complete(tableName, req, null,
new IOException(String.format("No location found for '%s', row='%s', locateType=%s",
new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
tableName, Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
@ -296,58 +310,60 @@ class AsyncNonMetaRegionLocator {
RegionInfo info = loc.getRegion();
if (info == null) {
complete(tableName, req, null,
new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
tableName, Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
if (info.isSplitParent()) {
return false;
}
if (loc.getServerName() == null) {
complete(tableName, req, null,
new IOException(
String.format("No server address listed for region '%s', row='%s', locateType=%s",
info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType)));
return true;
}
complete(tableName, req, loc, null);
complete(tableName, req, locs, null);
return true;
}
private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) {
Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId) {
Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
if (entry == null) {
return null;
}
HRegionLocation loc = entry.getValue();
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
return null;
}
byte[] endKey = loc.getRegion().getEndKey();
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT);
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
return loc;
return locs;
} else {
return null;
}
}
private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName,
byte[] row) {
private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
byte[] row, int replicaId) {
boolean isEmptyStopRow = isEmptyStopRow(row);
Map.Entry<byte[], HRegionLocation> entry =
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
Map.Entry<byte[], RegionLocations> entry =
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
if (entry == null) {
return null;
}
HRegionLocation loc = entry.getValue();
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
return null;
}
if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
(!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" +
Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE);
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
return loc;
return locs;
} else {
return null;
}
@ -390,8 +406,8 @@ class AsyncNonMetaRegionLocator {
if (tableNotFound) {
complete(tableName, req, null, new TableNotFoundException(tableName));
} else if (!completeNormally) {
complete(tableName, req, null, new IOException(
"Unable to find region for " + Bytes.toStringBinary(req.row) + " in " + tableName));
complete(tableName, req, null, new IOException("Unable to find region for '" +
Bytes.toStringBinary(req.row) + "' in " + tableName));
}
}
@ -423,13 +439,12 @@ class AsyncNonMetaRegionLocator {
continue;
}
RegionInfo info = loc.getRegion();
if (info == null || info.isOffline() || info.isSplitParent() ||
loc.getServerName() == null) {
if (info == null || info.isOffline() || info.isSplitParent()) {
continue;
}
if (addToCache(tableCache, loc)) {
if (addToCache(tableCache, locs)) {
synchronized (tableCache) {
tableCache.clearCompletedRequests(Optional.of(loc));
tableCache.clearCompletedRequests(Optional.of(locs));
}
}
}
@ -438,36 +453,36 @@ class AsyncNonMetaRegionLocator {
});
}
private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
RegionLocateType locateType) {
private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId, RegionLocateType locateType) {
return locateType.equals(RegionLocateType.BEFORE)
? locateRowBeforeInCache(tableCache, tableName, row)
: locateRowInCache(tableCache, tableName, row);
? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
: locateRowInCache(tableCache, tableName, row, replicaId);
}
// locateToPrevious is true means we will use the start key of a region to locate the region
// placed before it. Used for reverse scan. See the comment of
// AsyncRegionLocator.getPreviousRegionLocation.
private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
byte[] row, RegionLocateType locateType, boolean reload) {
private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
// AFTER should be convert to CURRENT before calling this method
assert !locateType.equals(RegionLocateType.AFTER);
TableCache tableCache = getTableCache(tableName);
if (!reload) {
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) {
return CompletableFuture.completedFuture(loc);
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
if (isGood(locs, replicaId)) {
return CompletableFuture.completedFuture(locs);
}
}
CompletableFuture<HRegionLocation> future;
CompletableFuture<RegionLocations> future;
LocateRequest req;
boolean sendRequest = false;
synchronized (tableCache) {
// check again
if (!reload) {
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) {
return CompletableFuture.completedFuture(loc);
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
if (isGood(locs, replicaId)) {
return CompletableFuture.completedFuture(locs);
}
}
req = new LocateRequest(row, locateType);
@ -487,28 +502,58 @@ class AsyncNonMetaRegionLocator {
return future;
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType locateType, boolean reload) {
if (locateType.equals(RegionLocateType.BEFORE)) {
return getRegionLocationInternal(tableName, row, locateType, reload);
} else {
// as we know the exact row after us, so we can just create the new row, and use the same
// algorithm to locate it.
if (locateType.equals(RegionLocateType.AFTER)) {
row = createClosestRowAfter(row);
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
int replicaId, RegionLocateType locateType, boolean reload) {
// as we know the exact row after us, so we can just create the new row, and use the same
// algorithm to locate it.
if (locateType.equals(RegionLocateType.AFTER)) {
row = createClosestRowAfter(row);
locateType = RegionLocateType.CURRENT;
}
return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
}
private void removeLocationFromCache(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
return;
}
byte[] startKey = loc.getRegion().getStartKey();
for (;;) {
RegionLocations oldLocs = tableCache.cache.get(startKey);
HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
if (!canUpdateOnError(loc, oldLoc)) {
return;
}
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
if (newLocs == null) {
if (tableCache.cache.remove(startKey, oldLocs)) {
return;
}
} else {
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
return;
}
}
return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload);
}
}
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
AsyncRegionLocator.updateCachedLocation(loc, exception, l -> {
TableCache tableCache = cache.get(l.getRegion().getTable());
if (tableCache == null) {
return null;
}
return tableCache.cache.get(l.getRegion().getStartKey());
}, this::addToCache, this::removeFromCache);
private void addLocationToCache(HRegionLocation loc) {
addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
}
private HRegionLocation getCachedLocation(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
return null;
}
RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
}
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
this::addLocationToCache, this::removeLocationFromCache);
}
void clearCache(TableName tableName) {
@ -526,11 +571,11 @@ class AsyncNonMetaRegionLocator {
// only used for testing whether we have cached the location for a region.
@VisibleForTesting
HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) {
RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
TableCache tableCache = cache.get(tableName);
if (tableCache == null) {
return null;
}
return locateRowInCache(tableCache, tableName, row);
return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
}

View File

@ -18,26 +18,24 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
/**
* The asynchronous region locator.
@ -59,8 +57,8 @@ class AsyncRegionLocator {
this.retryTimer = retryTimer;
}
private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
long timeoutNs, Supplier<String> timeoutMsg) {
private <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, long timeoutNs,
Supplier<String> timeoutMsg) {
if (future.isDone() || timeoutNs <= 0) {
return future;
}
@ -78,17 +76,63 @@ class AsyncRegionLocator {
});
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
private boolean isMeta(TableName tableName) {
return TableName.isMetaTableName(tableName);
}
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
CompletableFuture<HRegionLocation> future =
tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload)
: nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload);
CompletableFuture<RegionLocations> future = isMeta(tableName)
? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload)
: nonMetaRegionLocator.getRegionLocations(tableName, row,
RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
"ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
CompletableFuture<RegionLocations> locsFuture =
isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload)
: nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
addListener(locsFuture, (locs, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
future
.completeExceptionally(new RegionException("No location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
} else if (loc.getServerName() == null) {
future.completeExceptionally(new HBaseIOException("No server address listed for region '" +
loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) +
"', locateType=" + type + ", replicaId=" + replicaId));
} else {
future.complete(loc);
}
});
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) +
"', replicaId=" + replicaId);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload,
timeoutNs);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
@ -96,56 +140,11 @@ class AsyncRegionLocator {
return getRegionLocation(tableName, row, type, false, timeoutNs);
}
static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
// Do not need to update if no such location, or the location is newer, or the location is not
// same with us
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
oldLoc.getServerName().equals(loc.getServerName());
}
static void updateCachedLocation(HRegionLocation loc, Throwable exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception);
}
if (!canUpdate(loc, oldLoc)) {
return;
}
Throwable cause = findException(exception);
if (LOG.isDebugEnabled()) {
LOG.debug("The actual exception when updating " + loc, cause);
}
if (cause == null || !isMetaClearingException(cause)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Will not update " + loc + " because the exception is null or not the one we care about");
}
return;
}
if (cause instanceof RegionMovedException) {
RegionMovedException rme = (RegionMovedException) cause;
HRegionLocation newLoc =
new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme);
}
addToCache.accept(newLoc);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Try removing " + loc + " from cache");
}
removeFromCache.accept(loc);
}
}
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
if (loc.getRegion().isMetaRegion()) {
metaRegionLocator.updateCachedLocation(loc, exception);
metaRegionLocator.updateCachedLocationOnError(loc, exception);
} else {
nonMetaRegionLocator.updateCachedLocation(loc, exception);
nonMetaRegionLocator.updateCachedLocationOnError(loc, exception);
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class for asynchronous region locator.
*/
@InterfaceAudience.Private
final class AsyncRegionLocatorHelper {
private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocatorHelper.class);
private AsyncRegionLocatorHelper() {
}
static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
// Do not need to update if no such location, or the location is newer, or the location is not
// the same with us
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
oldLoc.getServerName().equals(loc.getServerName());
}
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
LOG.debug("Try updating {} , the old value is {}", loc, oldLoc, exception);
if (!canUpdateOnError(loc, oldLoc)) {
return;
}
Throwable cause = findException(exception);
LOG.debug("The actual exception when updating {}", loc, cause);
if (cause == null || !isMetaClearingException(cause)) {
LOG.debug("Will not update {} because the exception is null or not the one we care about",
loc);
return;
}
if (cause instanceof RegionMovedException) {
RegionMovedException rme = (RegionMovedException) cause;
HRegionLocation newLoc =
new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum());
LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme);
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
removeFromCache.accept(loc);
}
}
static RegionLocations createRegionLocations(HRegionLocation loc) {
int replicaId = loc.getRegion().getReplicaId();
HRegionLocation[] locs = new HRegionLocation[replicaId + 1];
locs[replicaId] = loc;
return new RegionLocations(locs);
}
/**
* Create a new {@link RegionLocations} based on the given {@code oldLocs}, and replace the
* location for the given {@code replicaId} with the given {@code loc}.
* <p/>
* All the {@link RegionLocations} in async locator related class are immutable because we want to
* access them concurrently, so here we need to create a new one, instead of calling
* {@link RegionLocations#updateLocation(HRegionLocation, boolean, boolean)}.
*/
static RegionLocations replaceRegionLocation(RegionLocations oldLocs, HRegionLocation loc) {
int replicaId = loc.getRegion().getReplicaId();
HRegionLocation[] locs = oldLocs.getRegionLocations();
locs = Arrays.copyOf(locs, Math.max(replicaId + 1, locs.length));
locs[replicaId] = loc;
return new RegionLocations(locs);
}
/**
* Create a new {@link RegionLocations} based on the given {@code oldLocs}, and remove the
* location for the given {@code replicaId}.
* <p/>
* All the {@link RegionLocations} in async locator related class are immutable because we want to
* access them concurrently, so here we need to create a new one, instead of calling
* {@link RegionLocations#remove(int)}.
*/
static RegionLocations removeRegionLocation(RegionLocations oldLocs, int replicaId) {
HRegionLocation[] locs = oldLocs.getRegionLocations();
if (locs.length < replicaId + 1) {
// Here we do not modify the oldLocs so it is safe to return it.
return oldLocs;
}
locs = Arrays.copyOf(locs, locs.length);
locs[replicaId] = null;
if (ObjectUtils.firstNonNull(locs) != null) {
return new RegionLocations(locs);
} else {
// if all the locations are null, just return null
return null;
}
}
/**
* Create a new {@link RegionLocations} which is the merging result for the given two
* {@link RegionLocations}.
* <p/>
* All the {@link RegionLocations} in async locator related class are immutable because we want to
* access them concurrently, so here we need to create a new one, instead of calling
* {@link RegionLocations#mergeLocations(RegionLocations)} directly.
*/
static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) {
RegionLocations locs = new RegionLocations(newLocs.getRegionLocations());
locs.mergeLocations(oldLocs);
return locs;
}
static boolean isGood(RegionLocations locs, int replicaId) {
if (locs == null) {
return false;
}
HRegionLocation loc = locs.getRegionLocation(replicaId);
return loc != null && loc.getServerName() != null;
}
}

View File

@ -88,15 +88,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}
protected long remainingTimeNs() {
protected final long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
protected void completeExceptionally() {
protected final void completeExceptionally() {
future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
}
protected void resetCallTimeout() {
protected final void resetCallTimeout() {
long callTimeoutNs;
if (operationTimeoutNs > 0) {
callTimeoutNs = remainingTimeNs();
@ -111,8 +111,15 @@ public abstract class AsyncRpcRetryingCaller<T> {
resetController(controller, callTimeoutNs);
}
protected void onError(Throwable error, Supplier<String> errMsg,
protected final void onError(Throwable error, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
if (future.isDone()) {
// Give up if the future is already done, this is possible if user has already canceled the
// future. And for timeline consistent read, we will also cancel some requests if we have
// already get one of the responses.
LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
return;
}
error = translateException(error);
if (error instanceof DoNotRetryIOException) {
future.completeExceptionally(error);

View File

@ -17,22 +17,22 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@ -75,6 +75,8 @@ class AsyncRpcRetryingCallerFactory {
private RegionLocateType locateType = RegionLocateType.CURRENT;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@ -121,11 +123,17 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
this.replicaId = replicaId;
return this;
}
public AsyncSingleRequestRpcRetryingCaller<T> build() {
checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId,
checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@ -241,11 +249,11 @@ class AsyncRpcRetryingCallerFactory {
public AsyncScanSingleRegionRpcRetryingCaller build() {
checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
checkNotNull(scan, "scan is null"), scanMetrics, scannerId,
checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs,
pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@ -311,7 +319,7 @@ class AsyncRpcRetryingCallerFactory {
public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
public <T> List<CompletableFuture<T>> call() {
@ -363,8 +371,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncMasterRequestRpcRetryingCaller<T> build() {
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
}
/**
@ -390,7 +398,8 @@ class AsyncRpcRetryingCallerFactory {
private ServerName serverName;
public AdminRequestCallerBuilder<T> action(AsyncAdminRequestRetryingCaller.Callable<T> callable) {
public AdminRequestCallerBuilder<T> action(
AsyncAdminRequestRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@ -420,15 +429,15 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public AdminRequestCallerBuilder<T> serverName(ServerName serverName){
public AdminRequestCallerBuilder<T> serverName(ServerName serverName) {
this.serverName = serverName;
return this;
}
public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
"serverName is null"), checkNotNull(callable, "action is null"));
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}
public CompletableFuture<T> call() {
@ -436,7 +445,7 @@ class AsyncRpcRetryingCallerFactory {
}
}
public <T> AdminRequestCallerBuilder<T> adminRequest(){
public <T> AdminRequestCallerBuilder<T> adminRequest() {
return new AdminRequestCallerBuilder<>();
}
@ -488,8 +497,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncServerRequestRpcRetryingCaller<T> build() {
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName,
"serverName is null"), checkNotNull(callable, "action is null"));
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
}
public CompletableFuture<T> call() {

View File

@ -17,17 +17,19 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
/**
* Retry caller for a single request, such as get, put, delete, etc.
@ -45,18 +47,21 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
private final byte[] row;
private final int replicaId;
private final RegionLocateType locateType;
private final Callable<T> callable;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
this.replicaId = replicaId;
this.locateType = locateType;
this.callable = callable;
}
@ -67,23 +72,22 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
stub = conn.getRegionServerStub(loc.getServerName());
} catch (IOException e) {
onError(e,
() -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row)
+ "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocation(loc, err));
() -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) +
"' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
return;
}
resetCallTimeout();
callable.call(controller, loc, stub).whenComplete(
(result, error) -> {
if (error != null) {
onError(error,
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in "
+ loc.getRegion().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocation(loc, err));
return;
}
future.complete(result);
});
callable.call(controller, loc, stub).whenComplete((result, error) -> {
if (error != null) {
onError(error,
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
loc.getRegion().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
return;
}
future.complete(result);
});
}
@Override
@ -98,18 +102,17 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
} else {
locateTimeoutNs = -1L;
}
conn.getLocator()
.getRegionLocation(tableName, row, locateType, locateTimeoutNs)
.whenComplete(
(loc, error) -> {
if (error != null) {
onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName
+ " failed", err -> {
});
return;
}
call(loc);
});
addListener(
conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs),
(loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
});
return;
}
call(loc);
});
}
@Override

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@ -55,5 +54,30 @@ public interface AsyncTableRegionLocator {
* @param row Row to find.
* @param reload true to reload information or false to use cached information
*/
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload);
default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload);
}
/**
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
* <p>
* Returns the location of the region with the given <code>replicaId</code> to which the row
* belongs.
* @param row Row to find.
* @param replicaId the replica id of the region
*/
default CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId) {
return getRegionLocation(row, replicaId, false);
}
/**
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
* <p>
* Returns the location of the region with the given <code>replicaId</code> to which the row
* belongs.
* @param row Row to find.
* @param replicaId the replica id of the region
* @param reload true to reload information or false to use cached information
*/
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
}

View File

@ -44,7 +44,9 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
}
@Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L);
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId,
boolean reload) {
return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
-1L);
}
}

View File

@ -38,6 +38,9 @@ public class ConnectionConfiguration {
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
"hbase.client.primaryCallTimeout.get";
public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
@ -86,7 +89,7 @@ public class ConnectionConfiguration {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.primaryCallTimeoutMicroSecond =
conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms
conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
this.replicaCallTimeoutMicroSecondScan =
conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms

View File

@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@ -45,9 +45,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -63,7 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
/**
* The implementation of RawAsyncTable.
* <p>
* <p/>
* The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
* be finished inside the rpc framework thread, which means that the callbacks registered to the
* {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
@ -74,6 +77,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
private final AsyncConnectionImpl conn;
private final TableName tableName;
@ -204,58 +209,126 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
}
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
return newCaller(row.getRow(), rpcTimeoutNs);
}
private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
return this.<Result> newCaller(get, timeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
.replicaId(replicaId).call();
}
// Connect the two futures, if the src future is done, then mark the dst future as done. And if
// the dst future is done, then cancel the src future. This is used for timeline consistent read.
private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
addListener(srcFuture, (r, e) -> {
if (e != null) {
dstFuture.completeExceptionally(e);
} else {
dstFuture.complete(r);
}
});
// The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
// Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
// -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
// CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
// tie.
addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
}
private void timelineConsistentGet(Get get, RegionLocations locs,
CompletableFuture<Result> future) {
if (future.isDone()) {
// do not send requests to secondary replicas if the future is done, i.e, the primary request
// has already been finished.
return;
}
for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
connect(secondaryFuture, future);
}
}
@Override
public CompletableFuture<Result> get(Get get) {
return this.<Result> newCaller(get, readRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
.call();
CompletableFuture<Result> primaryFuture =
get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
if (get.getConsistency() == Consistency.STRONG) {
return primaryFuture;
}
// Timeline consistent read, where we will send requests to other region replicas
CompletableFuture<Result> future = new CompletableFuture<>();
connect(primaryFuture, future);
long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
long startNs = System.nanoTime();
addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
if (error != null) {
LOG.warn(
"Failed to locate all the replicas for table={}, row='{}'," +
" give up timeline consistent read",
tableName, Bytes.toStringBinary(get.getRow()), error);
return;
}
if (locs.size() <= 1) {
LOG.warn(
"There are no secondary replicas for region {}," + " give up timeline consistent read",
locs.getDefaultRegionLocation().getRegion());
return;
}
long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
if (delayNs <= 0) {
timelineConsistentGet(get, locs, future);
} else {
AsyncConnectionImpl.RETRY_TIMER.newTimeout(
timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
}
});
return future;
}
@Override
public CompletableFuture<Void> put(Put put) {
return this.<Void> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call();
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call();
}
@Override
public CompletableFuture<Void> delete(Delete delete) {
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call();
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call();
}
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
return this.<Result> newCaller(append, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
}
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
return this.<Result> newCaller(increment, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
}
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@ -313,36 +386,36 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenPut(Put put) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
(c, r) -> r.getProcessed()))
.call();
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
(c, r) -> r.getProcessed()))
.call();
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
resp -> resp.getExists()))
.call();
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
resp -> resp.getExists()))
.call();
}
}
@ -375,10 +448,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex
: new IOException(
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
} else {
future.complete(respConverter
.apply((Result) multiResp.getResults().get(regionName).result.get(0)));
.apply((Result) multiResp.getResults().get(regionName).result.get(0)));
}
} catch (IOException e) {
future.completeExceptionally(e);
@ -399,7 +472,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
}, resp -> null)).call();
}, resp -> null))
.call();
}
private Scan setDefaultScanConfig(Scan scan) {
@ -416,7 +490,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
}
private long resultSize2CacheSize(long maxResultSize) {
@ -427,8 +501,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public ResultScanner getScanner(Scan scan) {
return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
resultSize2CacheSize(
scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
resultSize2CacheSize(
scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
}
@Override
@ -477,14 +551,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
.map(f -> f.<Void> thenApply(r -> null)).collect(toList());
.map(f -> f.<Void> thenApply(r -> null)).collect(toList());
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
@ -515,7 +589,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs);
region, row, rpcTimeoutNs, operationTimeoutNs);
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
@ -553,10 +627,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback,
List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
Throwable error) {
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
if (error != null) {
callback.onError(error);
return;
@ -566,11 +639,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (locateFinished(region, endKey, endKeyInclusive)) {
locateFinished.set(true);
} else {
conn.getLocator()
.getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
operationTimeoutNs)
.whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey,
endKeyInclusive, locateFinished, unfinishedRequest, l, e));
addListener(
conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
operationTimeoutNs),
(l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e));
}
coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
if (e != null) {
@ -630,11 +703,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public void execute() {
conn.getLocator().getRegionLocation(tableName, startKey,
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
.whenComplete(
(loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
addListener(conn.getLocator().getRegionLocation(tableName, startKey,
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
(loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class for processing futures.
*/
@InterfaceAudience.Private
public final class FutureUtils {
private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
private FutureUtils() {
}
/**
* This is method is used when you just want to add a listener to the given future. We will call
* {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
* {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
* suppress exceptions thrown from the code that completes the future, and this method will catch
* all the exception thrown from the {@code action} to catch possible code bugs.
* <p/>
* And the error phone check will always report FutureReturnValueIgnored because every method in
* the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
* have one future that has not been checked. So we introduce this method and add a suppress
* warnings annotation here.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public static <T> void addListener(CompletableFuture<T> future,
BiConsumer<? super T, ? super Throwable> action) {
future.whenComplete((resp, error) -> {
try {
action.accept(resp, error);
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
});
}
}

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.util.Bytes;
final class RegionReplicaTestHelper {
private RegionReplicaTestHelper() {
}
// waits for all replicas to have region location
static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry,
int regionReplication) throws IOException {
TestZKAsyncRegistry.TEST_UTIL.waitFor(
TestZKAsyncRegistry.TEST_UTIL.getConfiguration()
.getLong("hbase.client.sync.wait.timeout.msec", 60000),
200, true, new ExplainingPredicate<IOException>() {
@Override
public String explainFailure() throws IOException {
return "Not all meta replicas get assigned";
}
@Override
public boolean evaluate() throws IOException {
try {
RegionLocations locs = registry.getMetaRegionLocation().get();
if (locs.size() < regionReplication) {
return false;
}
for (int i = 0; i < regionReplication; i++) {
if (locs.getRegionLocation(i) == null) {
return false;
}
}
return true;
} catch (Exception e) {
TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e);
return false;
}
}
});
}
static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName,
int replicaId) {
return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.filter(rs -> rs.getRegions(tableName).stream()
.anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId))
.findAny().map(rs -> rs.getServerName());
}
/**
* Return the new location.
*/
static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc)
throws Exception {
ServerName serverName = currentLoc.getServerName();
RegionInfo regionInfo = currentLoc.getRegion();
TableName tableName = regionInfo.getTable();
int replicaId = regionInfo.getReplicaId();
ServerName newServerName = util.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
.get();
util.getAdmin().move(regionInfo.getEncodedNameAsBytes(),
Bytes.toBytes(newServerName.getServerName()));
util.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Optional<ServerName> newServerName = getRSCarryingReplica(util, tableName, replicaId);
return newServerName.isPresent() && !newServerName.get().equals(serverName);
}
@Override
public String explainFailure() throws Exception {
return regionInfo.getRegionNameAsString() + " is still on " + serverName;
}
});
return newServerName;
}
interface Locator {
RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
throws Exception;
void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception;
}
static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator)
throws Exception {
RegionLocations locs =
locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false);
assertEquals(3, locs.size());
for (int i = 0; i < 3; i++) {
HRegionLocation loc = locs.getRegionLocation(i);
assertNotNull(loc);
ServerName serverName = getRSCarryingReplica(util, tableName, i).get();
assertEquals(serverName, loc.getServerName());
}
ServerName newServerName = moveRegion(util, locs.getDefaultRegionLocation());
// The cached location should not be changed
assertEquals(locs.getDefaultRegionLocation().getServerName(),
locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
.getDefaultRegionLocation().getServerName());
// should get the new location when reload = true
assertEquals(newServerName,
locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
.getDefaultRegionLocation().getServerName());
// the cached location should be replaced
assertEquals(newServerName,
locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
.getDefaultRegionLocation().getServerName());
ServerName newServerName1 = moveRegion(util, locs.getRegionLocation(1));
ServerName newServerName2 = moveRegion(util, locs.getRegionLocation(2));
// The cached location should not be change
assertEquals(locs.getRegionLocation(1).getServerName(),
locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
// clear the cached location for replica 1
locator.updateCachedLocationOnError(locs.getRegionLocation(1), new NotServingRegionException());
// the cached location for replica 2 should not be changed
assertEquals(locs.getRegionLocation(2).getServerName(),
locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
// should get the new location as we have cleared the old location
assertEquals(newServerName1,
locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName());
// as we will get the new location for replica 2 at once, we should also get the new location
// for replica 2
assertEquals(newServerName2,
locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
}
}

View File

@ -17,20 +17,19 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -42,7 +41,7 @@ public class TestAsyncMetaRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -53,10 +52,11 @@ public class TestAsyncMetaRegionLocator {
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.waitUntilAllSystemRegionsAssigned();
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
}
@ -66,42 +66,21 @@ public class TestAsyncMetaRegionLocator {
TEST_UTIL.shutdownMiniCluster();
}
private Optional<ServerName> getRSCarryingMeta() {
return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer())
.filter(rs -> !rs.getRegions(TableName.META_TABLE_NAME).isEmpty()).findAny()
.map(rs -> rs.getServerName());
}
@Test
public void testReload() throws Exception {
ServerName serverName = getRSCarryingMeta().get();
assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
.findAny().get();
TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
Bytes.toBytes(newServerName.getServerName()));
TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
public void test() throws Exception {
testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() {
@Override
public boolean evaluate() throws Exception {
Optional<ServerName> newServerName = getRSCarryingMeta();
return newServerName.isPresent() && !newServerName.get().equals(serverName);
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
public String explainFailure() throws Exception {
return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " +
serverName;
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
throws Exception {
return LOCATOR.getRegionLocations(replicaId, reload).get();
}
});
// The cached location will not change
assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
// should get the new location when reload = true
assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName());
// the cached location should be replaced
assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName());
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -38,10 +39,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -58,7 +61,7 @@ public class TestAsyncNonMetaRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -78,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent());
registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
@ -109,11 +112,18 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
}
private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
byte[] row, RegionLocateType locateType, boolean reload) {
return LOCATOR
.getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
.thenApply(RegionLocations::getDefaultRegionLocation);
}
@Test
public void testNoTable() throws InterruptedException {
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@ -126,7 +136,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@ -148,13 +158,13 @@ public class TestAsyncNonMetaRegionLocator {
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
ThreadLocalRandom.current().nextBytes(randKey);
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get());
getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get());
}
}
@ -179,12 +189,12 @@ public class TestAsyncNonMetaRegionLocator {
private ServerName[] getLocations(byte[][] startKeys) {
ServerName[] serverNames = new ServerName[startKeys.length];
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.forEach(rs -> {
rs.getRegions(TABLE_NAME).forEach(r -> {
serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
Bytes::compareTo)] = rs.getServerName();
});
.forEach(rs -> {
rs.getRegions(TABLE_NAME).forEach(r -> {
serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(),
Bytes::compareTo)] = rs.getServerName();
});
});
return serverNames;
}
@ -196,8 +206,9 @@ public class TestAsyncNonMetaRegionLocator {
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
serverNames[i], LOCATOR
.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get());
serverNames[i],
getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false)
.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@ -208,7 +219,7 @@ public class TestAsyncNonMetaRegionLocator {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
serverNames[i],
LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@ -220,8 +231,7 @@ public class TestAsyncNonMetaRegionLocator {
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
try {
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false)
.get());
getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@ -232,29 +242,29 @@ public class TestAsyncNonMetaRegionLocator {
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
createSingleRegionTable();
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
HRegionLocation loc = LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
HRegionLocation loc =
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
.findAny().get();
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
.get();
TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()),
Bytes.toBytes(newServerName.getServerName()));
while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName()
.equals(newServerName)) {
.equals(newServerName)) {
Thread.sleep(100);
}
// Should be same as it is in cache
assertSame(loc, LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocation(loc, null);
assertSame(loc,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocationOnError(loc, null);
// null error will not trigger a cache cleanup
assertSame(loc, LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
assertSame(loc,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
}
// usually locate after will return the same result, so we add a test to make it return different
@ -266,21 +276,21 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
TEST_UTIL.waitTableAvailable(TABLE_NAME);
HRegionLocation currentLoc =
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
HRegionLocation afterLoc =
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
ServerName afterServerName =
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.filter(rs -> rs.getRegions(TABLE_NAME).stream()
.anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
.findAny().get().getServerName();
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.filter(rs -> rs.getRegions(TABLE_NAME).stream()
.anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey())))
.findAny().get().getServerName();
assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
assertSame(afterLoc,
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
}
// For HBASE-17402
@ -292,9 +302,9 @@ public class TestAsyncNonMetaRegionLocator {
ServerName[] serverNames = getLocations(startKeys);
for (int i = 0; i < 100; i++) {
LOCATOR.clearCache(TABLE_NAME);
List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000)
.mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
List<CompletableFuture<HRegionLocation>> futures =
IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
.map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
.collect(toList());
for (int j = 0; j < 1000; j++) {
int index = Math.min(8, j / 111);
@ -309,11 +319,11 @@ public class TestAsyncNonMetaRegionLocator {
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
.findAny().get();
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny()
.get();
Admin admin = TEST_UTIL.getAdmin();
RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get();
admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName()));
@ -334,15 +344,15 @@ public class TestAsyncNonMetaRegionLocator {
// The cached location will not change
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
// should get the new location when reload = true
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
// the cached location should be replaced
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
}
@ -351,10 +361,32 @@ public class TestAsyncNonMetaRegionLocator {
public void testLocateBeforeLastRegion()
throws IOException, InterruptedException, ExecutionException {
createMultiRegionTable();
LOCATOR.getRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
HRegionLocation loc =
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get();
// should locate to the last region
assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW);
}
@Test
public void testRegionReplicas() throws Exception {
TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
testLocator(TEST_UTIL, TABLE_NAME, new Locator() {
@Override
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error);
}
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
throws Exception {
return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
RegionLocateType.CURRENT, reload).get();
}
});
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@ -59,7 +60,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.getAdmin().balancerSwitch(false, true);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent());
registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.toArray(byte[][]::new);
.toArray(byte[][]::new);
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
}
@ -138,11 +139,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
TEST_UTIL.shutdownMiniCluster();
}
private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
private void assertLocs(List<CompletableFuture<RegionLocations>> futures)
throws InterruptedException, ExecutionException {
assertEquals(256, futures.size());
for (int i = 0; i < futures.size(); i++) {
HRegionLocation loc = futures.get(i).get();
HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation();
if (i == 0) {
assertTrue(isEmptyStartRow(loc.getRegion().getStartKey()));
} else {
@ -158,10 +159,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
@Test
public void test() throws InterruptedException, ExecutionException {
List<CompletableFuture<HRegionLocation>> futures =
IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
.collect(toList());
List<CompletableFuture<RegionLocations>> futures =
IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID,
RegionLocateType.CURRENT, false))
.collect(toList());
assertLocs(futures);
assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(),
MAX_CONCURRENCY.get() <= MAX_ALLOWED);

View File

@ -49,7 +49,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.waitTableAvailable(TABLE_NAME);
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), User.getCurrent());
registry.getClusterId().get(), User.getCurrent());
}
@AfterClass
@ -89,8 +89,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME)
.setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build();
AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
.setMaxRetries(30).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// move back
@ -110,8 +110,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
public void testMaxRetries() throws IOException, InterruptedException {
try {
CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
.maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
.action((controller, loc, stub) -> failedFuture()).call().get();
.maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
.action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
@ -123,8 +123,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
long startNs = System.nanoTime();
try {
CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
.pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
.action((controller, loc, stub) -> failedFuture()).call().get();
.pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
.action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
e.printStackTrace();
@ -141,30 +141,30 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
AtomicInteger count = new AtomicInteger(0);
HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
AsyncRegionLocator mockedLocator =
new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
@Override
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType locateType, long timeoutNs) {
if (tableName.equals(TABLE_NAME)) {
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
if (count.getAndIncrement() == 0) {
errorTriggered.set(true);
future.completeExceptionally(new RuntimeException("Inject error!"));
} else {
future.complete(loc);
}
return future;
new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
@Override
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType locateType, long timeoutNs) {
if (tableName.equals(TABLE_NAME)) {
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
if (count.getAndIncrement() == 0) {
errorTriggered.set(true);
future.completeExceptionally(new RuntimeException("Inject error!"));
} else {
return super.getRegionLocation(tableName, row, locateType, timeoutNs);
future.complete(loc);
}
return future;
} else {
return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
}
}
@Override
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
}
};
@Override
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
}
};
try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
@ -172,7 +172,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
}
}) {
AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
.setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
.setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(errorTriggered.get());
errorTriggered.set(false);

View File

@ -69,8 +69,8 @@ public class TestAsyncTableLocatePrefetch {
@Test
public void test() throws InterruptedException, ExecutionException {
assertNotNull(LOCATOR
.getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get());
assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"),
RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get());
// we finish the request before we adding the remaining results to cache so sleep a bit here
Thread.sleep(1000);
// confirm that the locations of all the regions have been cached.

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableRegionReplicasGet {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] VALUE = Bytes.toBytes("value");
private static AsyncConnection ASYNC_CONN;
@Rule
public TestName testName = new TestName();
@Parameter
public Supplier<AsyncTable<?>> getTable;
private static AsyncTable<?> getRawTable() {
return ASYNC_CONN.getTable(TABLE_NAME);
}
private static AsyncTable<?> getTable() {
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
}
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
}
private static volatile boolean FAIL_PRIMARY_GET = false;
private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
RegionInfo region = c.getEnvironment().getRegionInfo();
if (!region.getTable().equals(TABLE_NAME)) {
return;
}
if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
SECONDARY_GET_COUNT.incrementAndGet();
} else {
PRIMARY_GET_COUNT.incrementAndGet();
if (FAIL_PRIMARY_GET) {
throw new IOException("Inject error");
}
}
}
}
private static boolean allReplicasHaveRow() throws IOException {
for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
if (region.get(new Get(ROW), false).isEmpty()) {
return false;
}
}
}
return true;
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// 10 mins
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
TimeUnit.MINUTES.toMillis(10));
// 1 second
TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
TimeUnit.SECONDS.toMicros(1));
// set a small pause so we will retry very quickly
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
// infinite retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
.setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// this is the fastest way to let all replicas have the row
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testNoReplicaRead() throws Exception {
FAIL_PRIMARY_GET = false;
SECONDARY_GET_COUNT.set(0);
AsyncTable<?> table = getTable.get();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
for (int i = 0; i < 1000; i++) {
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
}
// the primary region is fine and the primary timeout is 1 second which is long enough, so we
// should not send any requests to secondary replicas even if the consistency is timeline.
Thread.sleep(5000);
assertEquals(0, SECONDARY_GET_COUNT.get());
}
@Test
public void testReplicaRead() throws Exception {
// fail the primary get request
FAIL_PRIMARY_GET = true;
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
// make sure that we could still get the value from secondary replicas
AsyncTable<?> table = getTable.get();
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
// make sure that the primary request has been canceled
Thread.sleep(5000);
int count = PRIMARY_GET_COUNT.get();
Thread.sleep(10000);
assertEquals(count, PRIMARY_GET_COUNT.get());
}
}

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.assertNotSame;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
@ -52,43 +50,13 @@ public class TestZKAsyncRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
HBaseClassTestRule.forClass(TestZKAsyncRegistry.class);
private static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class);
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static ZKAsyncRegistry REGISTRY;
// waits for all replicas to have region location
static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException {
TEST_UTIL.waitFor(
TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
new ExplainingPredicate<IOException>() {
@Override
public String explainFailure() throws IOException {
return TEST_UTIL.explainTableAvailability(tbl);
}
@Override
public boolean evaluate() throws IOException {
AtomicBoolean ready = new AtomicBoolean(true);
try {
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {
HRegionLocation loc = locs.getRegionLocation(i);
if (loc == null) {
ready.set(false);
}
});
} catch (Exception e) {
ready.set(false);
}
return ready.get();
}
});
}
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
@ -107,14 +75,14 @@ public class TestZKAsyncRegistry {
LOG.info("STARTED TEST");
String clusterId = REGISTRY.getClusterId().get();
String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId();
assertEquals("Expected " + expectedClusterId + ", found=" + clusterId,
expectedClusterId, clusterId);
assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId,
clusterId);
assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(),
REGISTRY.getCurrentNrHRS().get().intValue());
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getMasterAddress().get());
assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
waitUntilAllReplicasHavingRegionLocation(TableName.META_TABLE_NAME);
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> {