HBASE-17282 Reduce the redundant requests to meta table
This commit is contained in:
parent
da356069f2
commit
f041306cda
|
@ -95,7 +95,6 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
this.connConf = new AsyncConnectionConfiguration(conf);
|
this.connConf = new AsyncConnectionConfiguration(conf);
|
||||||
this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
|
|
||||||
this.registry = AsyncRegistryFactory.getRegistry(conf);
|
this.registry = AsyncRegistryFactory.getRegistry(conf);
|
||||||
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
|
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -107,6 +106,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||||
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
|
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
|
||||||
this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
|
this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
|
||||||
|
this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
|
||||||
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
|
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
|
||||||
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
|
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
|
||||||
nonceGenerator = PerClientRandomNonceGenerator.get();
|
nonceGenerator = PerClientRandomNonceGenerator.get();
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The asynchronous locator for meta region.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class AsyncMetaRegionLocator {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AsyncMetaRegionLocator.class);
|
||||||
|
|
||||||
|
private final AsyncRegistry registry;
|
||||||
|
|
||||||
|
private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
|
||||||
|
|
||||||
|
private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
|
||||||
|
new AtomicReference<>();
|
||||||
|
|
||||||
|
AsyncMetaRegionLocator(AsyncRegistry registry) {
|
||||||
|
this.registry = registry;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<HRegionLocation> getRegionLocation() {
|
||||||
|
for (;;) {
|
||||||
|
HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
|
||||||
|
if (metaRegionLocation != null) {
|
||||||
|
return CompletableFuture.completedFuture(metaRegionLocation);
|
||||||
|
}
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Meta region location cache is null, try fetching from registry.");
|
||||||
|
}
|
||||||
|
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Start fetching meta region location from registry.");
|
||||||
|
}
|
||||||
|
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
|
||||||
|
registry.getMetaRegionLocation().whenComplete((locs, error) -> {
|
||||||
|
if (error != null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Failed to fetch meta region location from registry", error);
|
||||||
|
}
|
||||||
|
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("The fetched meta region location is " + loc);
|
||||||
|
}
|
||||||
|
// Here we update cache before reset future, so it is possible that someone can get a
|
||||||
|
// stale value. Consider this:
|
||||||
|
// 1. update cache
|
||||||
|
// 2. someone clear the cache and relocate again
|
||||||
|
// 3. the metaRelocateFuture is not null so the old future is used.
|
||||||
|
// 4. we clear metaRelocateFuture and complete the future in it with the value being
|
||||||
|
// cleared in step 2.
|
||||||
|
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
|
||||||
|
// caller will retry again later, no correctness problems.
|
||||||
|
this.metaRegionLocation.set(loc);
|
||||||
|
metaRelocateFuture.set(null);
|
||||||
|
future.complete(loc);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
|
||||||
|
if (future != null) {
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
||||||
|
updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
|
||||||
|
for (;;) {
|
||||||
|
HRegionLocation oldLoc = metaRegionLocation.get();
|
||||||
|
if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
|
||||||
|
|| oldLoc.getServerName().equals(newLoc.getServerName()))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, l -> {
|
||||||
|
for (;;) {
|
||||||
|
HRegionLocation oldLoc = metaRegionLocation.get();
|
||||||
|
if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void clearCache() {
|
||||||
|
metaRegionLocation.set(null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,487 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.NINES;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
||||||
|
import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
|
||||||
|
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||||
|
import static org.apache.hadoop.hbase.client.AsyncRegionLocator.updateCachedLoation;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||||
|
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
|
||||||
|
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The asynchronous locator for regions other than meta.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class AsyncNonMetaRegionLocator {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AsyncNonMetaRegionLocator.class);
|
||||||
|
|
||||||
|
static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
|
||||||
|
"hbase.client.meta.max.concurrent.locate.per.table";
|
||||||
|
|
||||||
|
private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
|
||||||
|
|
||||||
|
private final AsyncConnectionImpl conn;
|
||||||
|
|
||||||
|
private final int maxConcurrentLocateRequestPerTable;
|
||||||
|
|
||||||
|
private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private static final class LocateRequest {
|
||||||
|
|
||||||
|
public final byte[] row;
|
||||||
|
|
||||||
|
public final boolean locateToPrevious;
|
||||||
|
|
||||||
|
public LocateRequest(byte[] row, boolean locateToPrevious) {
|
||||||
|
this.row = row;
|
||||||
|
this.locateToPrevious = locateToPrevious;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Bytes.hashCode(row) ^ Boolean.hashCode(locateToPrevious);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == null || obj.getClass() != LocateRequest.class) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
LocateRequest that = (LocateRequest) obj;
|
||||||
|
return locateToPrevious == that.locateToPrevious && Bytes.equals(row, that.row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class TableCache {
|
||||||
|
|
||||||
|
public final ConcurrentNavigableMap<byte[], HRegionLocation> cache =
|
||||||
|
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
|
||||||
|
|
||||||
|
public final Set<LocateRequest> pendingRequests = new HashSet<>();
|
||||||
|
|
||||||
|
public final Map<LocateRequest, CompletableFuture<HRegionLocation>> allRequests =
|
||||||
|
new HashMap<>();
|
||||||
|
|
||||||
|
public boolean hasQuota(int max) {
|
||||||
|
return pendingRequests.size() < max;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPending(LocateRequest req) {
|
||||||
|
return pendingRequests.contains(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void send(LocateRequest req) {
|
||||||
|
pendingRequests.add(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
|
||||||
|
this.conn = conn;
|
||||||
|
this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
|
||||||
|
MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TableCache getTableCache(TableName tableName) {
|
||||||
|
return computeIfAbsent(cache, tableName, TableCache::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeFromCache(HRegionLocation loc) {
|
||||||
|
TableCache tableCache = cache.get(loc.getRegionInfo().getTable());
|
||||||
|
if (tableCache == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
|
||||||
|
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
||||||
|
|| !oldLoc.getServerName().equals(loc.getServerName())) {
|
||||||
|
return oldLoc;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// return whether we add this loc to cache
|
||||||
|
private boolean addToCache(TableCache tableCache, HRegionLocation loc) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Try adding " + loc + " to cache");
|
||||||
|
}
|
||||||
|
byte[] startKey = loc.getRegionInfo().getStartKey();
|
||||||
|
HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc);
|
||||||
|
if (oldLoc == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
||||||
|
|| oldLoc.getServerName().equals(loc.getServerName())) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
|
||||||
|
+ " is newer than us or has the same server name");
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return loc == tableCache.cache.compute(startKey, (k, oldValue) -> {
|
||||||
|
if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
|
||||||
|
return loc;
|
||||||
|
}
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
|
||||||
|
+ " is newer than us or has the same server name."
|
||||||
|
+ " Maybe it is updated before we replace it");
|
||||||
|
}
|
||||||
|
return oldValue;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
|
||||||
|
justification = "Called by lambda expression")
|
||||||
|
private void addToCache(HRegionLocation loc) {
|
||||||
|
addToCache(getTableCache(loc.getRegionInfo().getTable()), loc);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Try adding " + loc + " to cache");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean tryComplete(LocateRequest req, CompletableFuture<HRegionLocation> future,
|
||||||
|
HRegionLocation loc) {
|
||||||
|
if (future.isDone()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
boolean completed;
|
||||||
|
if (req.locateToPrevious) {
|
||||||
|
completed = Bytes.equals(loc.getRegionInfo().getEndKey(), req.row);
|
||||||
|
} else {
|
||||||
|
completed = loc.getRegionInfo().containsRow(req.row);
|
||||||
|
}
|
||||||
|
if (completed) {
|
||||||
|
future.complete(loc);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void complete(TableName tableName, LocateRequest req, HRegionLocation loc,
|
||||||
|
Throwable error, String rowNameInErrorMsg) {
|
||||||
|
if (error != null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Failed to locate region in '" + tableName + "', " + rowNameInErrorMsg + "='"
|
||||||
|
+ Bytes.toStringBinary(req.row) + "'",
|
||||||
|
error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LocateRequest toSend = null;
|
||||||
|
TableCache tableCache = getTableCache(tableName);
|
||||||
|
if (loc != null) {
|
||||||
|
if (!addToCache(tableCache, loc)) {
|
||||||
|
// someone is ahead of us.
|
||||||
|
synchronized (tableCache) {
|
||||||
|
tableCache.pendingRequests.remove(req);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
synchronized (tableCache) {
|
||||||
|
tableCache.pendingRequests.remove(req);
|
||||||
|
if (error instanceof DoNotRetryIOException) {
|
||||||
|
CompletableFuture<?> future = tableCache.allRequests.remove(req);
|
||||||
|
if (future != null) {
|
||||||
|
future.completeExceptionally(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (loc != null) {
|
||||||
|
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>>> iter =
|
||||||
|
tableCache.allRequests.entrySet().iterator(); iter.hasNext();) {
|
||||||
|
Map.Entry<LocateRequest, CompletableFuture<HRegionLocation>> entry = iter.next();
|
||||||
|
if (tryComplete(entry.getKey(), entry.getValue(), loc)) {
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!tableCache.allRequests.isEmpty()
|
||||||
|
&& tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) {
|
||||||
|
LocateRequest[] candidates = tableCache.allRequests.keySet().stream()
|
||||||
|
.filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new);
|
||||||
|
if (candidates.length > 0) {
|
||||||
|
// TODO: use a better algorithm to send a request which is more likely to fetch a new
|
||||||
|
// location.
|
||||||
|
toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (toSend != null) {
|
||||||
|
if (toSend.locateToPrevious) {
|
||||||
|
locatePreviousInMeta(tableName, toSend);
|
||||||
|
} else {
|
||||||
|
locateInMeta(tableName, toSend);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onScanComplete(TableName tableName, LocateRequest req, List<Result> results,
|
||||||
|
Throwable error, String rowNameInErrorMsg) {
|
||||||
|
if (error != null) {
|
||||||
|
complete(tableName, req, null, error, rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (results.isEmpty()) {
|
||||||
|
complete(tableName, req, null, new TableNotFoundException(tableName), rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
|
||||||
|
+ Bytes.toStringBinary(req.row) + "' is " + locs);
|
||||||
|
}
|
||||||
|
if (locs == null || locs.getDefaultRegionLocation() == null) {
|
||||||
|
complete(tableName, req, null,
|
||||||
|
new IOException(String.format("No location found for '%s', %s='%s'", tableName,
|
||||||
|
rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
|
||||||
|
rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||||
|
HRegionInfo info = loc.getRegionInfo();
|
||||||
|
if (info == null) {
|
||||||
|
complete(tableName, req, null,
|
||||||
|
new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
|
||||||
|
rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
|
||||||
|
rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!info.getTable().equals(tableName)) {
|
||||||
|
complete(tableName, req, null,
|
||||||
|
new TableNotFoundException(
|
||||||
|
"Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"),
|
||||||
|
rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (info.isSplit()) {
|
||||||
|
complete(tableName, req, null,
|
||||||
|
new RegionOfflineException(
|
||||||
|
"the only available region for the required row is a split parent,"
|
||||||
|
+ " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"),
|
||||||
|
rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (info.isOffline()) {
|
||||||
|
complete(tableName, req, null,
|
||||||
|
new RegionOfflineException("the region is offline, could"
|
||||||
|
+ " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"),
|
||||||
|
rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (loc.getServerName() == null) {
|
||||||
|
complete(tableName, req, null,
|
||||||
|
new NoServerForRegionException(
|
||||||
|
String.format("No server address listed for region '%s', %s='%s'",
|
||||||
|
info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(req.row))),
|
||||||
|
rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (req.locateToPrevious && !Bytes.equals(info.getEndKey(), req.row)) {
|
||||||
|
complete(tableName, req, null,
|
||||||
|
new DoNotRetryIOException("The end key of '" + info.getRegionNameAsString() + "' is '"
|
||||||
|
+ Bytes.toStringBinary(info.getEndKey()) + "', expected '"
|
||||||
|
+ Bytes.toStringBinary(req.row) + "'"),
|
||||||
|
rowNameInErrorMsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
complete(tableName, req, loc, null, rowNameInErrorMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row) {
|
||||||
|
Map.Entry<byte[], HRegionLocation> entry = tableCache.cache.floorEntry(row);
|
||||||
|
if (entry == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
HRegionLocation loc = entry.getValue();
|
||||||
|
byte[] endKey = loc.getRegionInfo().getEndKey();
|
||||||
|
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
|
||||||
|
+ Bytes.toStringBinary(row) + "'");
|
||||||
|
}
|
||||||
|
return loc;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HRegionLocation locatePreviousInCache(TableCache tableCache, TableName tableName,
|
||||||
|
byte[] startRowOfCurrentRegion) {
|
||||||
|
Map.Entry<byte[], HRegionLocation> entry;
|
||||||
|
if (isEmptyStopRow(startRowOfCurrentRegion)) {
|
||||||
|
entry = tableCache.cache.lastEntry();
|
||||||
|
} else {
|
||||||
|
entry = tableCache.cache.lowerEntry(startRowOfCurrentRegion);
|
||||||
|
}
|
||||||
|
if (entry == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
HRegionLocation loc = entry.getValue();
|
||||||
|
if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
|
||||||
|
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
|
||||||
|
}
|
||||||
|
return loc;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void locateInMeta(TableName tableName, LocateRequest req) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(
|
||||||
|
"Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + "' in meta");
|
||||||
|
}
|
||||||
|
byte[] metaKey = createRegionName(tableName, req.row, NINES, false);
|
||||||
|
conn.getRawTable(META_TABLE_NAME)
|
||||||
|
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||||
|
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error, "row"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void locatePreviousInMeta(TableName tableName, LocateRequest req) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
|
||||||
|
+ Bytes.toStringBinary(req.row) + "' in meta");
|
||||||
|
}
|
||||||
|
byte[] metaKey;
|
||||||
|
if (isEmptyStopRow(req.row)) {
|
||||||
|
byte[] binaryTableName = tableName.getName();
|
||||||
|
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
|
||||||
|
} else {
|
||||||
|
metaKey = createRegionName(tableName, req.row, ZEROES, false);
|
||||||
|
}
|
||||||
|
conn.getRawTable(META_TABLE_NAME)
|
||||||
|
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||||
|
.whenComplete((results, error) -> onScanComplete(tableName, req, results, error,
|
||||||
|
"startRowOfCurrentRegion"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row,
|
||||||
|
boolean locateToPrevious) {
|
||||||
|
return locateToPrevious ? locatePreviousInCache(tableCache, tableName, row)
|
||||||
|
: locateInCache(tableCache, tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
|
// locateToPrevious is true means we will use the start key of a region to locate the region
|
||||||
|
// placed before it. Used for reverse scan. See the comment of
|
||||||
|
// AsyncRegionLocator.getPreviousRegionLocation.
|
||||||
|
private CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||||
|
boolean locateToPrevious) {
|
||||||
|
TableCache tableCache = getTableCache(tableName);
|
||||||
|
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateToPrevious);
|
||||||
|
if (loc != null) {
|
||||||
|
return CompletableFuture.completedFuture(loc);
|
||||||
|
}
|
||||||
|
CompletableFuture<HRegionLocation> future;
|
||||||
|
LocateRequest req;
|
||||||
|
boolean sendRequest = false;
|
||||||
|
synchronized (tableCache) {
|
||||||
|
// check again
|
||||||
|
loc = locateInCache(tableCache, tableName, row, locateToPrevious);
|
||||||
|
if (loc != null) {
|
||||||
|
return CompletableFuture.completedFuture(loc);
|
||||||
|
}
|
||||||
|
req = new LocateRequest(row, locateToPrevious);
|
||||||
|
future = tableCache.allRequests.get(req);
|
||||||
|
if (future == null) {
|
||||||
|
future = new CompletableFuture<>();
|
||||||
|
tableCache.allRequests.put(req, future);
|
||||||
|
if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
|
||||||
|
tableCache.send(req);
|
||||||
|
sendRequest = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (sendRequest) {
|
||||||
|
if (locateToPrevious) {
|
||||||
|
locatePreviousInMeta(tableName, req);
|
||||||
|
} else {
|
||||||
|
locateInMeta(tableName, req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row) {
|
||||||
|
return getRegionLocation(tableName, row, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used for reverse scan. See the comment of AsyncRegionLocator.getPreviousRegionLocation.
|
||||||
|
// TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
|
||||||
|
// of a region.
|
||||||
|
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||||
|
byte[] startRowOfCurrentRegion) {
|
||||||
|
return getRegionLocation(tableName, startRowOfCurrentRegion, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
||||||
|
updateCachedLoation(loc, exception, l -> {
|
||||||
|
TableCache tableCache = cache.get(l.getRegionInfo().getTable());
|
||||||
|
if (tableCache == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return tableCache.cache.get(l.getRegionInfo().getStartKey());
|
||||||
|
}, this::addToCache, this::removeFromCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
void clearCache(TableName tableName) {
|
||||||
|
TableCache tableCache = cache.remove(tableName);
|
||||||
|
if (tableCache == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
synchronized (tableCache) {
|
||||||
|
if (!tableCache.allRequests.isEmpty()) {
|
||||||
|
IOException error = new IOException("Cache cleared");
|
||||||
|
tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,42 +17,23 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY;
|
|
||||||
import static org.apache.hadoop.hbase.HConstants.NINES;
|
|
||||||
import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
|
||||||
import static org.apache.hadoop.hbase.HRegionInfo.createRegionName;
|
|
||||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
|
||||||
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
|
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException;
|
||||||
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
|
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
|
||||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
|
||||||
|
|
||||||
import io.netty.util.HashedWheelTimer;
|
import io.netty.util.HashedWheelTimer;
|
||||||
import io.netty.util.Timeout;
|
import io.netty.util.Timeout;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||||
|
@ -66,346 +47,73 @@ class AsyncRegionLocator {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class);
|
private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class);
|
||||||
|
|
||||||
private final AsyncConnectionImpl conn;
|
|
||||||
|
|
||||||
private final HashedWheelTimer retryTimer;
|
private final HashedWheelTimer retryTimer;
|
||||||
|
|
||||||
private final AtomicReference<HRegionLocation> metaRegionLocation = new AtomicReference<>();
|
private final AsyncMetaRegionLocator metaRegionLocator;
|
||||||
|
|
||||||
private final AtomicReference<CompletableFuture<HRegionLocation>> metaRelocateFuture =
|
private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
|
||||||
new AtomicReference<>();
|
|
||||||
|
|
||||||
private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], HRegionLocation>> cache =
|
|
||||||
new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
|
AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
|
||||||
this.conn = conn;
|
this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
|
||||||
|
this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
|
||||||
this.retryTimer = retryTimer;
|
this.retryTimer = retryTimer;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<HRegionLocation> locateMetaRegion() {
|
|
||||||
for (;;) {
|
|
||||||
HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
|
|
||||||
if (metaRegionLocation != null) {
|
|
||||||
return CompletableFuture.completedFuture(metaRegionLocation);
|
|
||||||
}
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Meta region location cache is null, try fetching from registry.");
|
|
||||||
}
|
|
||||||
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Start fetching meta region location from registry.");
|
|
||||||
}
|
|
||||||
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
|
|
||||||
conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> {
|
|
||||||
if (error != null) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Failed to fetch meta region location from registry", error);
|
|
||||||
}
|
|
||||||
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("The fetched meta region location is " + loc);
|
|
||||||
}
|
|
||||||
// Here we update cache before reset future, so it is possible that someone can get a
|
|
||||||
// stale value. Consider this:
|
|
||||||
// 1. update cache
|
|
||||||
// 2. someone clear the cache and relocate again
|
|
||||||
// 3. the metaRelocateFuture is not null so the old future is used.
|
|
||||||
// 4. we clear metaRelocateFuture and complete the future in it with the value being
|
|
||||||
// cleared in step 2.
|
|
||||||
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
|
|
||||||
// caller will retry again later, no correctness problems.
|
|
||||||
this.metaRegionLocation.set(loc);
|
|
||||||
metaRelocateFuture.set(null);
|
|
||||||
future.complete(loc);
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
CompletableFuture<HRegionLocation> future = metaRelocateFuture.get();
|
|
||||||
if (future != null) {
|
|
||||||
return future;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ConcurrentNavigableMap<byte[], HRegionLocation> createTableCache() {
|
|
||||||
return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void removeFromCache(HRegionLocation loc) {
|
|
||||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
|
|
||||||
cache.get(loc.getRegionInfo().getTable());
|
|
||||||
if (tableCache == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> {
|
|
||||||
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
|
||||||
|| !oldLoc.getServerName().equals(loc.getServerName())) {
|
|
||||||
return oldLoc;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addToCache(HRegionLocation loc) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Try adding " + loc + " to cache");
|
|
||||||
}
|
|
||||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = computeIfAbsent(cache,
|
|
||||||
loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache);
|
|
||||||
byte[] startKey = loc.getRegionInfo().getStartKey();
|
|
||||||
HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc);
|
|
||||||
if (oldLoc == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (oldLoc.getSeqNum() > loc.getSeqNum()
|
|
||||||
|| oldLoc.getServerName().equals(loc.getServerName())) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc
|
|
||||||
+ " is newer than us or has the same server name");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
tableCache.compute(startKey, (k, oldValue) -> {
|
|
||||||
if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) {
|
|
||||||
return loc;
|
|
||||||
}
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue
|
|
||||||
+ " is newer than us or has the same server name."
|
|
||||||
+ " Maybe it is updated before we replace it");
|
|
||||||
}
|
|
||||||
return oldValue;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private HRegionLocation locateInCache(TableName tableName, byte[] row) {
|
|
||||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
|
|
||||||
if (tableCache == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
Map.Entry<byte[], HRegionLocation> entry = tableCache.floorEntry(row);
|
|
||||||
if (entry == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
HRegionLocation loc = entry.getValue();
|
|
||||||
byte[] endKey = loc.getRegionInfo().getEndKey();
|
|
||||||
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
|
|
||||||
return loc;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void onScanComplete(CompletableFuture<HRegionLocation> future, TableName tableName,
|
|
||||||
byte[] row, List<Result> results, Throwable error, String rowNameInErrorMsg,
|
|
||||||
Consumer<HRegionLocation> otherCheck) {
|
|
||||||
if (error != null) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='"
|
|
||||||
+ Bytes.toStringBinary(row) + "'",
|
|
||||||
error);
|
|
||||||
}
|
|
||||||
future.completeExceptionally(error);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (results.isEmpty()) {
|
|
||||||
future.completeExceptionally(new TableNotFoundException(tableName));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0));
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='"
|
|
||||||
+ Bytes.toStringBinary(row) + "' is " + locs);
|
|
||||||
}
|
|
||||||
if (locs == null || locs.getDefaultRegionLocation() == null) {
|
|
||||||
future.completeExceptionally(
|
|
||||||
new IOException(String.format("No location found for '%s', %s='%s'", tableName,
|
|
||||||
rowNameInErrorMsg, Bytes.toStringBinary(row))));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
|
||||||
HRegionInfo info = loc.getRegionInfo();
|
|
||||||
if (info == null) {
|
|
||||||
future.completeExceptionally(
|
|
||||||
new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName,
|
|
||||||
rowNameInErrorMsg, Bytes.toStringBinary(row))));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (!info.getTable().equals(tableName)) {
|
|
||||||
future.completeExceptionally(new TableNotFoundException(
|
|
||||||
"Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (info.isSplit()) {
|
|
||||||
future.completeExceptionally(new RegionOfflineException(
|
|
||||||
"the only available region for the required row is a split parent,"
|
|
||||||
+ " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (info.isOffline()) {
|
|
||||||
future.completeExceptionally(new RegionOfflineException("the region is offline, could"
|
|
||||||
+ " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (loc.getServerName() == null) {
|
|
||||||
future.completeExceptionally(new NoServerForRegionException(
|
|
||||||
String.format("No server address listed for region '%s', %s='%s'",
|
|
||||||
info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row))));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
otherCheck.accept(loc);
|
|
||||||
addToCache(loc);
|
|
||||||
future.complete(loc);
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<HRegionLocation> locateInMeta(TableName tableName, byte[] row) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta");
|
|
||||||
}
|
|
||||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
|
||||||
byte[] metaKey = createRegionName(tableName, row, NINES, false);
|
|
||||||
conn.getRawTable(META_TABLE_NAME)
|
|
||||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
|
||||||
.whenComplete(
|
|
||||||
(results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
|
|
||||||
}));
|
|
||||||
return future;
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<HRegionLocation> locateRegion(TableName tableName, byte[] row) {
|
|
||||||
HRegionLocation loc = locateInCache(tableName, row);
|
|
||||||
if (loc != null) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='"
|
|
||||||
+ Bytes.toStringBinary(row) + "'");
|
|
||||||
}
|
|
||||||
return CompletableFuture.completedFuture(loc);
|
|
||||||
}
|
|
||||||
return locateInMeta(tableName, row);
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
|
private CompletableFuture<HRegionLocation> withTimeout(CompletableFuture<HRegionLocation> future,
|
||||||
long timeoutNs, Supplier<String> timeoutMsg) {
|
long timeoutNs, Supplier<String> timeoutMsg) {
|
||||||
if (future.isDone() || timeoutNs <= 0) {
|
if (future.isDone() || timeoutNs <= 0) {
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
CompletableFuture<HRegionLocation> timeoutFuture = new CompletableFuture<>();
|
Timeout timeoutTask = retryTimer.newTimeout(t -> {
|
||||||
Timeout timeoutTask = retryTimer.newTimeout(
|
if (future.isDone()) {
|
||||||
t -> timeoutFuture.completeExceptionally(new TimeoutIOException(timeoutMsg.get())), timeoutNs,
|
return;
|
||||||
TimeUnit.NANOSECONDS);
|
}
|
||||||
future.whenComplete((loc, error) -> {
|
future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
|
||||||
timeoutTask.cancel();
|
}, timeoutNs, TimeUnit.NANOSECONDS);
|
||||||
if (error != null) {
|
return future.whenComplete((loc, error) -> {
|
||||||
timeoutFuture.completeExceptionally(error);
|
if (error.getClass() != TimeoutIOException.class) {
|
||||||
} else {
|
// cancel timeout task if we are not completed by it.
|
||||||
timeoutFuture.complete(loc);
|
timeoutTask.cancel();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return timeoutFuture;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
|
||||||
long timeoutNs) {
|
long timeoutNs) {
|
||||||
CompletableFuture<HRegionLocation> future =
|
CompletableFuture<HRegionLocation> future =
|
||||||
tableName.equals(META_TABLE_NAME) ? locateMetaRegion() : locateRegion(tableName, row);
|
tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
|
||||||
|
: nonMetaRegionLocator.getRegionLocation(tableName, row);
|
||||||
return withTimeout(future, timeoutNs,
|
return withTimeout(future, timeoutNs,
|
||||||
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
|
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
|
||||||
+ "ms) waiting for region location for " + tableName + ", row='"
|
+ "ms) waiting for region location for " + tableName + ", row='"
|
||||||
+ Bytes.toStringBinary(row) + "'");
|
+ Bytes.toStringBinary(row) + "'");
|
||||||
}
|
}
|
||||||
|
|
||||||
private HRegionLocation locatePreviousInCache(TableName tableName,
|
|
||||||
byte[] startRowOfCurrentRegion) {
|
|
||||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache = cache.get(tableName);
|
|
||||||
if (tableCache == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
Map.Entry<byte[], HRegionLocation> entry;
|
|
||||||
if (isEmptyStopRow(startRowOfCurrentRegion)) {
|
|
||||||
entry = tableCache.lastEntry();
|
|
||||||
} else {
|
|
||||||
entry = tableCache.lowerEntry(startRowOfCurrentRegion);
|
|
||||||
}
|
|
||||||
if (entry == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
HRegionLocation loc = entry.getValue();
|
|
||||||
if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) {
|
|
||||||
return loc;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<HRegionLocation> locatePreviousInMeta(TableName tableName,
|
|
||||||
byte[] startRowOfCurrentRegion) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='"
|
|
||||||
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta");
|
|
||||||
}
|
|
||||||
byte[] metaKey;
|
|
||||||
if (isEmptyStopRow(startRowOfCurrentRegion)) {
|
|
||||||
byte[] binaryTableName = tableName.getName();
|
|
||||||
metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
|
|
||||||
} else {
|
|
||||||
metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
|
|
||||||
}
|
|
||||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
|
||||||
conn.getRawTable(META_TABLE_NAME)
|
|
||||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
|
||||||
.whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
|
|
||||||
results, error, "startRowOfCurrentRegion", loc -> {
|
|
||||||
HRegionInfo info = loc.getRegionInfo();
|
|
||||||
if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) {
|
|
||||||
future.completeExceptionally(new IOException("The end key of '"
|
|
||||||
+ info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey())
|
|
||||||
+ "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"));
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
return future;
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<HRegionLocation> locatePreviousRegion(TableName tableName,
|
|
||||||
byte[] startRowOfCurrentRegion) {
|
|
||||||
HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion);
|
|
||||||
if (loc != null) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='"
|
|
||||||
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
|
|
||||||
}
|
|
||||||
return CompletableFuture.completedFuture(loc);
|
|
||||||
}
|
|
||||||
return locatePreviousInMeta(tableName, startRowOfCurrentRegion);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Locate the previous region using the current regions start key. Used for reverse scan.
|
* Locate the previous region using the current regions start key. Used for reverse scan as the
|
||||||
* <p>
|
* end key is not included in a region so we need to treat it differently.
|
||||||
* TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow
|
|
||||||
* of a region.
|
|
||||||
*/
|
*/
|
||||||
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
|
||||||
byte[] startRowOfCurrentRegion, long timeoutNs) {
|
byte[] startRowOfCurrentRegion, long timeoutNs) {
|
||||||
CompletableFuture<HRegionLocation> future = tableName.equals(META_TABLE_NAME)
|
// meta region can not be split right now so we call the same method as getRegionLocation.
|
||||||
? locateMetaRegion() : locatePreviousRegion(tableName, startRowOfCurrentRegion);
|
// Change it later if the meta table can have more than one regions.
|
||||||
|
CompletableFuture<HRegionLocation> future =
|
||||||
|
tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
|
||||||
|
: nonMetaRegionLocator.getPreviousRegionLocation(tableName, startRowOfCurrentRegion);
|
||||||
return withTimeout(future, timeoutNs,
|
return withTimeout(future, timeoutNs,
|
||||||
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
|
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
|
||||||
+ "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='"
|
+ "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='"
|
||||||
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
|
+ Bytes.toStringBinary(startRowOfCurrentRegion) + "'");
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
|
static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
|
||||||
// Do not need to update if no such location, or the location is newer, or the location is not
|
// Do not need to update if no such location, or the location is newer, or the location is not
|
||||||
// same with us
|
// same with us
|
||||||
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
|
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
|
||||||
&& oldLoc.getServerName().equals(loc.getServerName());
|
&& oldLoc.getServerName().equals(loc.getServerName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateCachedLoation(HRegionLocation loc, Throwable exception,
|
static void updateCachedLoation(HRegionLocation loc, Throwable exception,
|
||||||
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
|
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
|
||||||
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
|
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
|
||||||
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
|
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
|
||||||
|
@ -445,34 +153,9 @@ class AsyncRegionLocator {
|
||||||
|
|
||||||
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
void updateCachedLocation(HRegionLocation loc, Throwable exception) {
|
||||||
if (loc.getRegionInfo().isMetaTable()) {
|
if (loc.getRegionInfo().isMetaTable()) {
|
||||||
updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> {
|
metaRegionLocator.updateCachedLocation(loc, exception);
|
||||||
for (;;) {
|
|
||||||
HRegionLocation oldLoc = metaRegionLocation.get();
|
|
||||||
if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum()
|
|
||||||
|| oldLoc.getServerName().equals(newLoc.getServerName()))) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, l -> {
|
|
||||||
for (;;) {
|
|
||||||
HRegionLocation oldLoc = metaRegionLocation.get();
|
|
||||||
if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
updateCachedLoation(loc, exception, l -> {
|
nonMetaRegionLocator.updateCachedLocation(loc, exception);
|
||||||
ConcurrentNavigableMap<byte[], HRegionLocation> tableCache =
|
|
||||||
cache.get(l.getRegionInfo().getTable());
|
|
||||||
if (tableCache == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return tableCache.get(l.getRegionInfo().getStartKey());
|
|
||||||
}, this::addToCache, this::removeFromCache);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,6 +163,10 @@ class AsyncRegionLocator {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Clear meta cache for " + tableName);
|
LOG.debug("Clear meta cache for " + tableName);
|
||||||
}
|
}
|
||||||
cache.remove(tableName);
|
if (tableName.equals(META_TABLE_NAME)) {
|
||||||
|
metaRegionLocator.clearCache();
|
||||||
|
} else {
|
||||||
|
nonMetaRegionLocator.clearCache(tableName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -97,7 +98,7 @@ public class TestAsyncGetMultiThread {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
CONN.close();
|
IOUtils.closeQuietly(CONN);
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -50,7 +51,7 @@ import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({ MediumTests.class, ClientTests.class })
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
public class TestAsyncRegionLocator {
|
public class TestAsyncNonMetaRegionLocator {
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@ -60,7 +61,7 @@ public class TestAsyncRegionLocator {
|
||||||
|
|
||||||
private static AsyncConnectionImpl CONN;
|
private static AsyncConnectionImpl CONN;
|
||||||
|
|
||||||
private static AsyncRegionLocator LOCATOR;
|
private static AsyncNonMetaRegionLocator LOCATOR;
|
||||||
|
|
||||||
private static byte[][] SPLIT_KEYS;
|
private static byte[][] SPLIT_KEYS;
|
||||||
|
|
||||||
|
@ -69,7 +70,7 @@ public class TestAsyncRegionLocator {
|
||||||
TEST_UTIL.startMiniCluster(3);
|
TEST_UTIL.startMiniCluster(3);
|
||||||
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
|
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
|
||||||
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
|
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
|
||||||
LOCATOR = CONN.getLocator();
|
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
|
||||||
SPLIT_KEYS = new byte[8][];
|
SPLIT_KEYS = new byte[8][];
|
||||||
for (int i = 111; i < 999; i += 111) {
|
for (int i = 111; i < 999; i += 111) {
|
||||||
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
||||||
|
@ -78,7 +79,7 @@ public class TestAsyncRegionLocator {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
CONN.close();
|
IOUtils.closeQuietly(CONN);
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,12 +103,12 @@ public class TestAsyncRegionLocator {
|
||||||
@Test
|
@Test
|
||||||
public void testNoTable() throws InterruptedException {
|
public void testNoTable() throws InterruptedException {
|
||||||
try {
|
try {
|
||||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
|
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
|
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||||
}
|
}
|
||||||
|
@ -118,12 +119,12 @@ public class TestAsyncRegionLocator {
|
||||||
createSingleRegionTable();
|
createSingleRegionTable();
|
||||||
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
|
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
|
||||||
try {
|
try {
|
||||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
|
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get();
|
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||||
}
|
}
|
||||||
|
@ -143,17 +144,17 @@ public class TestAsyncRegionLocator {
|
||||||
createSingleRegionTable();
|
createSingleRegionTable();
|
||||||
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
||||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
|
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
|
LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||||
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
|
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
|
||||||
ThreadLocalRandom.current().nextBytes(randKey);
|
ThreadLocalRandom.current().nextBytes(randKey);
|
||||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||||
LOCATOR.getRegionLocation(TABLE_NAME, randKey, 0L).get());
|
LOCATOR.getRegionLocation(TABLE_NAME, randKey).get());
|
||||||
// Use a key which is not the endKey of a region will cause error
|
// Use a key which is not the endKey of a region will cause error
|
||||||
try {
|
try {
|
||||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
|
||||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }, 0L).get());
|
LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get());
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
assertThat(e.getCause(), instanceOf(IOException.class));
|
assertThat(e.getCause(), instanceOf(IOException.class));
|
||||||
assertTrue(e.getCause().getMessage().contains("end key of"));
|
assertTrue(e.getCause().getMessage().contains("end key of"));
|
||||||
|
@ -193,7 +194,7 @@ public class TestAsyncRegionLocator {
|
||||||
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
|
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
|
||||||
try {
|
try {
|
||||||
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
|
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
|
||||||
serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], 0L).get());
|
serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get());
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -204,7 +205,7 @@ public class TestAsyncRegionLocator {
|
||||||
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
|
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
|
||||||
try {
|
try {
|
||||||
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
|
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
|
||||||
LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i], 0L).get());
|
LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get());
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -215,7 +216,7 @@ public class TestAsyncRegionLocator {
|
||||||
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
|
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
|
||||||
createSingleRegionTable();
|
createSingleRegionTable();
|
||||||
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
|
||||||
HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get();
|
HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get();
|
||||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
|
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
|
||||||
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||||
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
|
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
|
||||||
|
@ -228,12 +229,12 @@ public class TestAsyncRegionLocator {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
// Should be same as it is in cache
|
// Should be same as it is in cache
|
||||||
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
|
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||||
LOCATOR.updateCachedLocation(loc, null);
|
LOCATOR.updateCachedLocation(loc, null);
|
||||||
// null error will not trigger a cache cleanup
|
// null error will not trigger a cache cleanup
|
||||||
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
|
assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||||
LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
|
LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
|
||||||
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
|
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
|
||||||
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get());
|
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get());
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,159 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static java.util.stream.Collectors.toCollection;
|
||||||
|
import static java.util.stream.Collectors.toList;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
|
||||||
|
import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||||
|
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||||
|
|
||||||
|
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private static AsyncConnectionImpl CONN;
|
||||||
|
|
||||||
|
private static AsyncNonMetaRegionLocator LOCATOR;
|
||||||
|
|
||||||
|
private static byte[][] SPLIT_KEYS;
|
||||||
|
|
||||||
|
private static int MAX_ALLOWED = 2;
|
||||||
|
|
||||||
|
private static AtomicInteger CONCURRENCY = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0);
|
||||||
|
|
||||||
|
public static final class CountingRegionObserver extends BaseRegionObserver {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
|
||||||
|
RegionScanner s) throws IOException {
|
||||||
|
if (e.getEnvironment().getRegionInfo().isMetaTable()) {
|
||||||
|
int concurrency = CONCURRENCY.incrementAndGet();
|
||||||
|
for (;;) {
|
||||||
|
int max = MAX_CONCURRENCY.get();
|
||||||
|
if (concurrency <= max) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (MAX_CONCURRENCY.compareAndSet(max, concurrency)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Threads.sleepWithoutInterrupt(10);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s)
|
||||||
|
throws IOException {
|
||||||
|
if (e.getEnvironment().getRegionInfo().isMetaTable()) {
|
||||||
|
CONCURRENCY.decrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName());
|
||||||
|
conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED);
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
|
||||||
|
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
|
||||||
|
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
|
||||||
|
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
|
||||||
|
.toArray(byte[][]::new);
|
||||||
|
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
|
||||||
|
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
IOUtils.closeQuietly(CONN);
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertLocs(List<CompletableFuture<HRegionLocation>> futures)
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
assertEquals(256, futures.size());
|
||||||
|
for (int i = 0; i < futures.size(); i++) {
|
||||||
|
HRegionLocation loc = futures.get(i).get();
|
||||||
|
if (i == 0) {
|
||||||
|
assertTrue(isEmptyStartRow(loc.getRegionInfo().getStartKey()));
|
||||||
|
} else {
|
||||||
|
assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegionInfo().getStartKey()));
|
||||||
|
}
|
||||||
|
if (i == futures.size() - 1) {
|
||||||
|
assertTrue(isEmptyStopRow(loc.getRegionInfo().getEndKey()));
|
||||||
|
} else {
|
||||||
|
assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegionInfo().getEndKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws InterruptedException, ExecutionException {
|
||||||
|
List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 128)
|
||||||
|
.mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
|
||||||
|
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r)).collect(toCollection(ArrayList::new));
|
||||||
|
futures.addAll(IntStream.range(129, 257)
|
||||||
|
.mapToObj(i -> i < 256 ? Bytes.toBytes(String.format("%02x", i)) : EMPTY_START_ROW)
|
||||||
|
.map(r -> LOCATOR.getPreviousRegionLocation(TABLE_NAME, r)).collect(toList()));
|
||||||
|
assertLocs(futures);
|
||||||
|
assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED);
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,6 +30,7 @@ import java.io.IOException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -90,7 +91,7 @@ public class TestAsyncRegionLocatorTimeout {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
CONN.close();
|
IOUtils.closeQuietly(CONN);
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
@ -103,7 +104,7 @@ public class TestAsyncTable {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
ASYNC_CONN.close();
|
IOUtils.closeQuietly(ASYNC_CONN);
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
@ -88,7 +89,7 @@ public class TestAsyncTableNoncedRetry {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
ASYNC_CONN.close();
|
IOUtils.closeQuietly(ASYNC_CONN);
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue