diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 92785fb62fb..d660b02cdfc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -95,7 +95,6 @@ class AsyncConnectionImpl implements AsyncConnection { this.conf = conf; this.user = user; this.connConf = new AsyncConnectionConfiguration(conf); - this.locator = new AsyncRegionLocator(this, RETRY_TIMER); this.registry = AsyncRegistryFactory.getRegistry(conf); this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> { if (LOG.isDebugEnabled()) { @@ -107,6 +106,7 @@ class AsyncConnectionImpl implements AsyncConnection { this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); + this.locator = new AsyncRegionLocator(this, RETRY_TIMER); this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { nonceGenerator = PerClientRandomNonceGenerator.get(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java new file mode 100644 index 00000000000..5b7a68f1fd5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The asynchronous locator for meta region. + */ +@InterfaceAudience.Private +class AsyncMetaRegionLocator { + + private static final Log LOG = LogFactory.getLog(AsyncMetaRegionLocator.class); + + private final AsyncRegistry registry; + + private final AtomicReference metaRegionLocation = new AtomicReference<>(); + + private final AtomicReference> metaRelocateFuture = + new AtomicReference<>(); + + AsyncMetaRegionLocator(AsyncRegistry registry) { + this.registry = registry; + } + + CompletableFuture getRegionLocation() { + for (;;) { + HRegionLocation metaRegionLocation = this.metaRegionLocation.get(); + if (metaRegionLocation != null) { + return CompletableFuture.completedFuture(metaRegionLocation); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Meta region location cache is null, try fetching from registry."); + } + if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Start fetching meta region location from registry."); + } + CompletableFuture future = metaRelocateFuture.get(); + registry.getMetaRegionLocation().whenComplete((locs, error) -> { + if (error != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to fetch meta region location from registry", error); + } + metaRelocateFuture.getAndSet(null).completeExceptionally(error); + return; + } + HRegionLocation loc = locs.getDefaultRegionLocation(); + if (LOG.isDebugEnabled()) { + LOG.debug("The fetched meta region location is " + loc); + } + // Here we update cache before reset future, so it is possible that someone can get a + // stale value. Consider this: + // 1. update cache + // 2. someone clear the cache and relocate again + // 3. the metaRelocateFuture is not null so the old future is used. + // 4. we clear metaRelocateFuture and complete the future in it with the value being + // cleared in step 2. + // But we do not think it is a big deal as it rarely happens, and even if it happens, the + // caller will retry again later, no correctness problems. + this.metaRegionLocation.set(loc); + metaRelocateFuture.set(null); + future.complete(loc); + }); + } else { + CompletableFuture future = metaRelocateFuture.get(); + if (future != null) { + return future; + } + } + } + } + + void updateCachedLocation(HRegionLocation loc, Throwable exception) { + updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> { + for (;;) { + HRegionLocation oldLoc = metaRegionLocation.get(); + if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() + || oldLoc.getServerName().equals(newLoc.getServerName()))) { + return; + } + if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) { + return; + } + } + }, l -> { + for (;;) { + HRegionLocation oldLoc = metaRegionLocation.get(); + if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) { + return; + } + } + }); + } + + void clearCache() { + metaRegionLocation.set(null); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java new file mode 100644 index 00000000000..c22d2103545 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -0,0 +1,487 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY; +import static org.apache.hadoop.hbase.HConstants.NINES; +import static org.apache.hadoop.hbase.HConstants.ZEROES; +import static org.apache.hadoop.hbase.HRegionInfo.createRegionName; +import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import static org.apache.hadoop.hbase.client.AsyncRegionLocator.updateCachedLoation; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * The asynchronous locator for regions other than meta. + */ +@InterfaceAudience.Private +class AsyncNonMetaRegionLocator { + + private static final Log LOG = LogFactory.getLog(AsyncNonMetaRegionLocator.class); + + static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = + "hbase.client.meta.max.concurrent.locate.per.table"; + + private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; + + private final AsyncConnectionImpl conn; + + private final int maxConcurrentLocateRequestPerTable; + + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + + private static final class LocateRequest { + + public final byte[] row; + + public final boolean locateToPrevious; + + public LocateRequest(byte[] row, boolean locateToPrevious) { + this.row = row; + this.locateToPrevious = locateToPrevious; + } + + @Override + public int hashCode() { + return Bytes.hashCode(row) ^ Boolean.hashCode(locateToPrevious); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != LocateRequest.class) { + return false; + } + LocateRequest that = (LocateRequest) obj; + return locateToPrevious == that.locateToPrevious && Bytes.equals(row, that.row); + } + } + + private static final class TableCache { + + public final ConcurrentNavigableMap cache = + new ConcurrentSkipListMap<>(BYTES_COMPARATOR); + + public final Set pendingRequests = new HashSet<>(); + + public final Map> allRequests = + new HashMap<>(); + + public boolean hasQuota(int max) { + return pendingRequests.size() < max; + } + + public boolean isPending(LocateRequest req) { + return pendingRequests.contains(req); + } + + public void send(LocateRequest req) { + pendingRequests.add(req); + } + } + + AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) { + this.conn = conn; + this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( + MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); + } + + private TableCache getTableCache(TableName tableName) { + return computeIfAbsent(cache, tableName, TableCache::new); + } + + private void removeFromCache(HRegionLocation loc) { + TableCache tableCache = cache.get(loc.getRegionInfo().getTable()); + if (tableCache == null) { + return; + } + tableCache.cache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> { + if (oldLoc.getSeqNum() > loc.getSeqNum() + || !oldLoc.getServerName().equals(loc.getServerName())) { + return oldLoc; + } + return null; + }); + } + + // return whether we add this loc to cache + private boolean addToCache(TableCache tableCache, HRegionLocation loc) { + if (LOG.isTraceEnabled()) { + LOG.trace("Try adding " + loc + " to cache"); + } + byte[] startKey = loc.getRegionInfo().getStartKey(); + HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc); + if (oldLoc == null) { + return true; + } + if (oldLoc.getSeqNum() > loc.getSeqNum() + || oldLoc.getServerName().equals(loc.getServerName())) { + if (LOG.isTraceEnabled()) { + LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + + " is newer than us or has the same server name"); + } + return false; + } + return loc == tableCache.cache.compute(startKey, (k, oldValue) -> { + if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) { + return loc; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + + " is newer than us or has the same server name." + + " Maybe it is updated before we replace it"); + } + return oldValue; + }); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "Called by lambda expression") + private void addToCache(HRegionLocation loc) { + addToCache(getTableCache(loc.getRegionInfo().getTable()), loc); + if (LOG.isTraceEnabled()) { + LOG.trace("Try adding " + loc + " to cache"); + } + } + + private boolean tryComplete(LocateRequest req, CompletableFuture future, + HRegionLocation loc) { + if (future.isDone()) { + return true; + } + boolean completed; + if (req.locateToPrevious) { + completed = Bytes.equals(loc.getRegionInfo().getEndKey(), req.row); + } else { + completed = loc.getRegionInfo().containsRow(req.row); + } + if (completed) { + future.complete(loc); + return true; + } else { + return false; + } + } + + private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, + Throwable error, String rowNameInErrorMsg) { + if (error != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to locate region in '" + tableName + "', " + rowNameInErrorMsg + "='" + + Bytes.toStringBinary(req.row) + "'", + error); + } + } + LocateRequest toSend = null; + TableCache tableCache = getTableCache(tableName); + if (loc != null) { + if (!addToCache(tableCache, loc)) { + // someone is ahead of us. + synchronized (tableCache) { + tableCache.pendingRequests.remove(req); + } + return; + } + } + synchronized (tableCache) { + tableCache.pendingRequests.remove(req); + if (error instanceof DoNotRetryIOException) { + CompletableFuture future = tableCache.allRequests.remove(req); + if (future != null) { + future.completeExceptionally(error); + } + } + if (loc != null) { + for (Iterator>> iter = + tableCache.allRequests.entrySet().iterator(); iter.hasNext();) { + Map.Entry> entry = iter.next(); + if (tryComplete(entry.getKey(), entry.getValue(), loc)) { + iter.remove(); + } + } + } + if (!tableCache.allRequests.isEmpty() + && tableCache.hasQuota(maxConcurrentLocateRequestPerTable)) { + LocateRequest[] candidates = tableCache.allRequests.keySet().stream() + .filter(r -> !tableCache.isPending(r)).toArray(LocateRequest[]::new); + if (candidates.length > 0) { + // TODO: use a better algorithm to send a request which is more likely to fetch a new + // location. + toSend = candidates[ThreadLocalRandom.current().nextInt(candidates.length)]; + } + } + } + if (toSend != null) { + if (toSend.locateToPrevious) { + locatePreviousInMeta(tableName, toSend); + } else { + locateInMeta(tableName, toSend); + } + } + } + + private void onScanComplete(TableName tableName, LocateRequest req, List results, + Throwable error, String rowNameInErrorMsg) { + if (error != null) { + complete(tableName, req, null, error, rowNameInErrorMsg); + return; + } + if (results.isEmpty()) { + complete(tableName, req, null, new TableNotFoundException(tableName), rowNameInErrorMsg); + return; + } + RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); + if (LOG.isDebugEnabled()) { + LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='" + + Bytes.toStringBinary(req.row) + "' is " + locs); + } + if (locs == null || locs.getDefaultRegionLocation() == null) { + complete(tableName, req, null, + new IOException(String.format("No location found for '%s', %s='%s'", tableName, + rowNameInErrorMsg, Bytes.toStringBinary(req.row))), + rowNameInErrorMsg); + return; + } + HRegionLocation loc = locs.getDefaultRegionLocation(); + HRegionInfo info = loc.getRegionInfo(); + if (info == null) { + complete(tableName, req, null, + new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName, + rowNameInErrorMsg, Bytes.toStringBinary(req.row))), + rowNameInErrorMsg); + return; + } + if (!info.getTable().equals(tableName)) { + complete(tableName, req, null, + new TableNotFoundException( + "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'"), + rowNameInErrorMsg); + return; + } + if (info.isSplit()) { + complete(tableName, req, null, + new RegionOfflineException( + "the only available region for the required row is a split parent," + + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'"), + rowNameInErrorMsg); + return; + } + if (info.isOffline()) { + complete(tableName, req, null, + new RegionOfflineException("the region is offline, could" + + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'"), + rowNameInErrorMsg); + return; + } + if (loc.getServerName() == null) { + complete(tableName, req, null, + new NoServerForRegionException( + String.format("No server address listed for region '%s', %s='%s'", + info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(req.row))), + rowNameInErrorMsg); + return; + } + if (req.locateToPrevious && !Bytes.equals(info.getEndKey(), req.row)) { + complete(tableName, req, null, + new DoNotRetryIOException("The end key of '" + info.getRegionNameAsString() + "' is '" + + Bytes.toStringBinary(info.getEndKey()) + "', expected '" + + Bytes.toStringBinary(req.row) + "'"), + rowNameInErrorMsg); + return; + } + complete(tableName, req, loc, null, rowNameInErrorMsg); + } + + private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row) { + Map.Entry entry = tableCache.cache.floorEntry(row); + if (entry == null) { + return null; + } + HRegionLocation loc = entry.getValue(); + byte[] endKey = loc.getRegionInfo().getEndKey(); + if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + + Bytes.toStringBinary(row) + "'"); + } + return loc; + } else { + return null; + } + } + + private HRegionLocation locatePreviousInCache(TableCache tableCache, TableName tableName, + byte[] startRowOfCurrentRegion) { + Map.Entry entry; + if (isEmptyStopRow(startRowOfCurrentRegion)) { + entry = tableCache.cache.lastEntry(); + } else { + entry = tableCache.cache.lowerEntry(startRowOfCurrentRegion); + } + if (entry == null) { + return null; + } + HRegionLocation loc = entry.getValue(); + if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='" + + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"); + } + return loc; + } else { + return null; + } + } + + private void locateInMeta(TableName tableName, LocateRequest req) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + "' in meta"); + } + byte[] metaKey = createRegionName(tableName, req.row, NINES, false); + conn.getRawTable(META_TABLE_NAME) + .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) + .whenComplete((results, error) -> onScanComplete(tableName, req, results, error, "row")); + } + + private void locatePreviousInMeta(TableName tableName, LocateRequest req) { + if (LOG.isTraceEnabled()) { + LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='" + + Bytes.toStringBinary(req.row) + "' in meta"); + } + byte[] metaKey; + if (isEmptyStopRow(req.row)) { + byte[] binaryTableName = tableName.getName(); + metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); + } else { + metaKey = createRegionName(tableName, req.row, ZEROES, false); + } + conn.getRawTable(META_TABLE_NAME) + .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) + .whenComplete((results, error) -> onScanComplete(tableName, req, results, error, + "startRowOfCurrentRegion")); + } + + private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row, + boolean locateToPrevious) { + return locateToPrevious ? locatePreviousInCache(tableCache, tableName, row) + : locateInCache(tableCache, tableName, row); + } + + // locateToPrevious is true means we will use the start key of a region to locate the region + // placed before it. Used for reverse scan. See the comment of + // AsyncRegionLocator.getPreviousRegionLocation. + private CompletableFuture getRegionLocation(TableName tableName, byte[] row, + boolean locateToPrevious) { + TableCache tableCache = getTableCache(tableName); + HRegionLocation loc = locateInCache(tableCache, tableName, row, locateToPrevious); + if (loc != null) { + return CompletableFuture.completedFuture(loc); + } + CompletableFuture future; + LocateRequest req; + boolean sendRequest = false; + synchronized (tableCache) { + // check again + loc = locateInCache(tableCache, tableName, row, locateToPrevious); + if (loc != null) { + return CompletableFuture.completedFuture(loc); + } + req = new LocateRequest(row, locateToPrevious); + future = tableCache.allRequests.get(req); + if (future == null) { + future = new CompletableFuture<>(); + tableCache.allRequests.put(req, future); + if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { + tableCache.send(req); + sendRequest = true; + } + } + } + if (sendRequest) { + if (locateToPrevious) { + locatePreviousInMeta(tableName, req); + } else { + locateInMeta(tableName, req); + } + } + return future; + } + + CompletableFuture getRegionLocation(TableName tableName, byte[] row) { + return getRegionLocation(tableName, row, false); + } + + // Used for reverse scan. See the comment of AsyncRegionLocator.getPreviousRegionLocation. + // TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow + // of a region. + CompletableFuture getPreviousRegionLocation(TableName tableName, + byte[] startRowOfCurrentRegion) { + return getRegionLocation(tableName, startRowOfCurrentRegion, true); + } + + void updateCachedLocation(HRegionLocation loc, Throwable exception) { + updateCachedLoation(loc, exception, l -> { + TableCache tableCache = cache.get(l.getRegionInfo().getTable()); + if (tableCache == null) { + return null; + } + return tableCache.cache.get(l.getRegionInfo().getStartKey()); + }, this::addToCache, this::removeFromCache); + } + + void clearCache(TableName tableName) { + TableCache tableCache = cache.remove(tableName); + if (tableCache == null) { + return; + } + synchronized (tableCache) { + if (!tableCache.allRequests.isEmpty()) { + IOException error = new IOException("Cache cleared"); + tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error)); + } + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index ae8f2a26137..1c3569ab7dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -17,42 +17,23 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.CATALOG_FAMILY; -import static org.apache.hadoop.hbase.HConstants.NINES; -import static org.apache.hadoop.hbase.HConstants.ZEROES; -import static org.apache.hadoop.hbase.HRegionInfo.createRegionName; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; -import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException; import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; -import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -66,346 +47,73 @@ class AsyncRegionLocator { private static final Log LOG = LogFactory.getLog(AsyncRegionLocator.class); - private final AsyncConnectionImpl conn; - private final HashedWheelTimer retryTimer; - private final AtomicReference metaRegionLocation = new AtomicReference<>(); + private final AsyncMetaRegionLocator metaRegionLocator; - private final AtomicReference> metaRelocateFuture = - new AtomicReference<>(); - - private final ConcurrentMap> cache = - new ConcurrentHashMap<>(); + private final AsyncNonMetaRegionLocator nonMetaRegionLocator; AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { - this.conn = conn; + this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry); + this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn); this.retryTimer = retryTimer; } - private CompletableFuture locateMetaRegion() { - for (;;) { - HRegionLocation metaRegionLocation = this.metaRegionLocation.get(); - if (metaRegionLocation != null) { - return CompletableFuture.completedFuture(metaRegionLocation); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Meta region location cache is null, try fetching from registry."); - } - if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Start fetching meta region location from registry."); - } - CompletableFuture future = metaRelocateFuture.get(); - conn.registry.getMetaRegionLocation().whenComplete((locs, error) -> { - if (error != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to fetch meta region location from registry", error); - } - metaRelocateFuture.getAndSet(null).completeExceptionally(error); - return; - } - HRegionLocation loc = locs.getDefaultRegionLocation(); - if (LOG.isDebugEnabled()) { - LOG.debug("The fetched meta region location is " + loc); - } - // Here we update cache before reset future, so it is possible that someone can get a - // stale value. Consider this: - // 1. update cache - // 2. someone clear the cache and relocate again - // 3. the metaRelocateFuture is not null so the old future is used. - // 4. we clear metaRelocateFuture and complete the future in it with the value being - // cleared in step 2. - // But we do not think it is a big deal as it rarely happens, and even if it happens, the - // caller will retry again later, no correctness problems. - this.metaRegionLocation.set(loc); - metaRelocateFuture.set(null); - future.complete(loc); - }); - } else { - CompletableFuture future = metaRelocateFuture.get(); - if (future != null) { - return future; - } - } - } - } - - private static ConcurrentNavigableMap createTableCache() { - return new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); - } - - private void removeFromCache(HRegionLocation loc) { - ConcurrentNavigableMap tableCache = - cache.get(loc.getRegionInfo().getTable()); - if (tableCache == null) { - return; - } - tableCache.computeIfPresent(loc.getRegionInfo().getStartKey(), (k, oldLoc) -> { - if (oldLoc.getSeqNum() > loc.getSeqNum() - || !oldLoc.getServerName().equals(loc.getServerName())) { - return oldLoc; - } - return null; - }); - } - - private void addToCache(HRegionLocation loc) { - if (LOG.isTraceEnabled()) { - LOG.trace("Try adding " + loc + " to cache"); - } - ConcurrentNavigableMap tableCache = computeIfAbsent(cache, - loc.getRegionInfo().getTable(), AsyncRegionLocator::createTableCache); - byte[] startKey = loc.getRegionInfo().getStartKey(); - HRegionLocation oldLoc = tableCache.putIfAbsent(startKey, loc); - if (oldLoc == null) { - return; - } - if (oldLoc.getSeqNum() > loc.getSeqNum() - || oldLoc.getServerName().equals(loc.getServerName())) { - if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc - + " is newer than us or has the same server name"); - } - return; - } - tableCache.compute(startKey, (k, oldValue) -> { - if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) { - return loc; - } - if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue - + " is newer than us or has the same server name." - + " Maybe it is updated before we replace it"); - } - return oldValue; - }); - } - - private HRegionLocation locateInCache(TableName tableName, byte[] row) { - ConcurrentNavigableMap tableCache = cache.get(tableName); - if (tableCache == null) { - return null; - } - Map.Entry entry = tableCache.floorEntry(row); - if (entry == null) { - return null; - } - HRegionLocation loc = entry.getValue(); - byte[] endKey = loc.getRegionInfo().getEndKey(); - if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { - return loc; - } else { - return null; - } - } - - private void onScanComplete(CompletableFuture future, TableName tableName, - byte[] row, List results, Throwable error, String rowNameInErrorMsg, - Consumer otherCheck) { - if (error != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to fetch location of '" + tableName + "', " + rowNameInErrorMsg + "='" - + Bytes.toStringBinary(row) + "'", - error); - } - future.completeExceptionally(error); - return; - } - if (results.isEmpty()) { - future.completeExceptionally(new TableNotFoundException(tableName)); - return; - } - RegionLocations locs = MetaTableAccessor.getRegionLocations(results.get(0)); - if (LOG.isDebugEnabled()) { - LOG.debug("The fetched location of '" + tableName + "', " + rowNameInErrorMsg + "='" - + Bytes.toStringBinary(row) + "' is " + locs); - } - if (locs == null || locs.getDefaultRegionLocation() == null) { - future.completeExceptionally( - new IOException(String.format("No location found for '%s', %s='%s'", tableName, - rowNameInErrorMsg, Bytes.toStringBinary(row)))); - return; - } - HRegionLocation loc = locs.getDefaultRegionLocation(); - HRegionInfo info = loc.getRegionInfo(); - if (info == null) { - future.completeExceptionally( - new IOException(String.format("HRegionInfo is null for '%s', %s='%s'", tableName, - rowNameInErrorMsg, Bytes.toStringBinary(row)))); - return; - } - if (!info.getTable().equals(tableName)) { - future.completeExceptionally(new TableNotFoundException( - "Table '" + tableName + "' was not found, got: '" + info.getTable() + "'")); - return; - } - if (info.isSplit()) { - future.completeExceptionally(new RegionOfflineException( - "the only available region for the required row is a split parent," - + " the daughters should be online soon: '" + info.getRegionNameAsString() + "'")); - return; - } - if (info.isOffline()) { - future.completeExceptionally(new RegionOfflineException("the region is offline, could" - + " be caused by a disable table call: '" + info.getRegionNameAsString() + "'")); - return; - } - if (loc.getServerName() == null) { - future.completeExceptionally(new NoServerForRegionException( - String.format("No server address listed for region '%s', %s='%s'", - info.getRegionNameAsString(), rowNameInErrorMsg, Bytes.toStringBinary(row)))); - return; - } - otherCheck.accept(loc); - addToCache(loc); - future.complete(loc); - } - - private CompletableFuture locateInMeta(TableName tableName, byte[] row) { - if (LOG.isTraceEnabled()) { - LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(row) + "' in meta"); - } - CompletableFuture future = new CompletableFuture<>(); - byte[] metaKey = createRegionName(tableName, row, NINES, false); - conn.getRawTable(META_TABLE_NAME) - .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) - .whenComplete( - (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> { - })); - return future; - } - - private CompletableFuture locateRegion(TableName tableName, byte[] row) { - HRegionLocation loc = locateInCache(tableName, row); - if (loc != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" - + Bytes.toStringBinary(row) + "'"); - } - return CompletableFuture.completedFuture(loc); - } - return locateInMeta(tableName, row); - } - private CompletableFuture withTimeout(CompletableFuture future, long timeoutNs, Supplier timeoutMsg) { if (future.isDone() || timeoutNs <= 0) { return future; } - CompletableFuture timeoutFuture = new CompletableFuture<>(); - Timeout timeoutTask = retryTimer.newTimeout( - t -> timeoutFuture.completeExceptionally(new TimeoutIOException(timeoutMsg.get())), timeoutNs, - TimeUnit.NANOSECONDS); - future.whenComplete((loc, error) -> { - timeoutTask.cancel(); - if (error != null) { - timeoutFuture.completeExceptionally(error); - } else { - timeoutFuture.complete(loc); + Timeout timeoutTask = retryTimer.newTimeout(t -> { + if (future.isDone()) { + return; + } + future.completeExceptionally(new TimeoutIOException(timeoutMsg.get())); + }, timeoutNs, TimeUnit.NANOSECONDS); + return future.whenComplete((loc, error) -> { + if (error.getClass() != TimeoutIOException.class) { + // cancel timeout task if we are not completed by it. + timeoutTask.cancel(); } }); - return timeoutFuture; } CompletableFuture getRegionLocation(TableName tableName, byte[] row, long timeoutNs) { CompletableFuture future = - tableName.equals(META_TABLE_NAME) ? locateMetaRegion() : locateRegion(tableName, row); + tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation() + : nonMetaRegionLocator.getRegionLocation(tableName, row); return withTimeout(future, timeoutNs, () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + "'"); } - private HRegionLocation locatePreviousInCache(TableName tableName, - byte[] startRowOfCurrentRegion) { - ConcurrentNavigableMap tableCache = cache.get(tableName); - if (tableCache == null) { - return null; - } - Map.Entry entry; - if (isEmptyStopRow(startRowOfCurrentRegion)) { - entry = tableCache.lastEntry(); - } else { - entry = tableCache.lowerEntry(startRowOfCurrentRegion); - } - if (entry == null) { - return null; - } - HRegionLocation loc = entry.getValue(); - if (Bytes.equals(loc.getRegionInfo().getEndKey(), startRowOfCurrentRegion)) { - return loc; - } else { - return null; - } - } - - private CompletableFuture locatePreviousInMeta(TableName tableName, - byte[] startRowOfCurrentRegion) { - if (LOG.isTraceEnabled()) { - LOG.trace("Try locate '" + tableName + "', startRowOfCurrentRegion='" - + Bytes.toStringBinary(startRowOfCurrentRegion) + "' in meta"); - } - byte[] metaKey; - if (isEmptyStopRow(startRowOfCurrentRegion)) { - byte[] binaryTableName = tableName.getName(); - metaKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); - } else { - metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false); - } - CompletableFuture future = new CompletableFuture<>(); - conn.getRawTable(META_TABLE_NAME) - .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) - .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion, - results, error, "startRowOfCurrentRegion", loc -> { - HRegionInfo info = loc.getRegionInfo(); - if (!Bytes.equals(info.getEndKey(), startRowOfCurrentRegion)) { - future.completeExceptionally(new IOException("The end key of '" - + info.getRegionNameAsString() + "' is '" + Bytes.toStringBinary(info.getEndKey()) - + "', expected '" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'")); - } - })); - return future; - } - - private CompletableFuture locatePreviousRegion(TableName tableName, - byte[] startRowOfCurrentRegion) { - HRegionLocation loc = locatePreviousInCache(tableName, startRowOfCurrentRegion); - if (loc != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', startRowOfCurrentRegion='" - + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"); - } - return CompletableFuture.completedFuture(loc); - } - return locatePreviousInMeta(tableName, startRowOfCurrentRegion); - } - /** - * Locate the previous region using the current regions start key. Used for reverse scan. - *

- * TODO: need to deal with region merge where the startRowOfCurrentRegion will not be the endRow - * of a region. + * Locate the previous region using the current regions start key. Used for reverse scan as the + * end key is not included in a region so we need to treat it differently. */ CompletableFuture getPreviousRegionLocation(TableName tableName, byte[] startRowOfCurrentRegion, long timeoutNs) { - CompletableFuture future = tableName.equals(META_TABLE_NAME) - ? locateMetaRegion() : locatePreviousRegion(tableName, startRowOfCurrentRegion); + // meta region can not be split right now so we call the same method as getRegionLocation. + // Change it later if the meta table can have more than one regions. + CompletableFuture future = + tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation() + : nonMetaRegionLocator.getPreviousRegionLocation(tableName, startRowOfCurrentRegion); return withTimeout(future, timeoutNs, () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + "ms) waiting for region location for " + tableName + ", startRowOfCurrentRegion='" + Bytes.toStringBinary(startRowOfCurrentRegion) + "'"); } - private boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) { + static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) { // Do not need to update if no such location, or the location is newer, or the location is not // same with us return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() && oldLoc.getServerName().equals(loc.getServerName()); } - private void updateCachedLoation(HRegionLocation loc, Throwable exception, + static void updateCachedLoation(HRegionLocation loc, Throwable exception, Function cachedLocationSupplier, Consumer addToCache, Consumer removeFromCache) { HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); @@ -445,34 +153,9 @@ class AsyncRegionLocator { void updateCachedLocation(HRegionLocation loc, Throwable exception) { if (loc.getRegionInfo().isMetaTable()) { - updateCachedLoation(loc, exception, l -> metaRegionLocation.get(), newLoc -> { - for (;;) { - HRegionLocation oldLoc = metaRegionLocation.get(); - if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() - || oldLoc.getServerName().equals(newLoc.getServerName()))) { - return; - } - if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) { - return; - } - } - }, l -> { - for (;;) { - HRegionLocation oldLoc = metaRegionLocation.get(); - if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) { - return; - } - } - }); + metaRegionLocator.updateCachedLocation(loc, exception); } else { - updateCachedLoation(loc, exception, l -> { - ConcurrentNavigableMap tableCache = - cache.get(l.getRegionInfo().getTable()); - if (tableCache == null) { - return null; - } - return tableCache.get(l.getRegionInfo().getStartKey()); - }, this::addToCache, this::removeFromCache); + nonMetaRegionLocator.updateCachedLocation(loc, exception); } } @@ -480,6 +163,10 @@ class AsyncRegionLocator { if (LOG.isDebugEnabled()) { LOG.debug("Clear meta cache for " + tableName); } - cache.remove(tableName); + if (tableName.equals(META_TABLE_NAME)) { + metaRegionLocator.clearCache(); + } else { + nonMetaRegionLocator.clearCache(tableName); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java index fe988aa99b5..d24501d6caa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java @@ -38,6 +38,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; @@ -97,7 +98,7 @@ public class TestAsyncGetMultiThread { @AfterClass public static void tearDown() throws Exception { - CONN.close(); + IOUtils.closeQuietly(CONN); TEST_UTIL.shutdownMiniCluster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java similarity index 92% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index a6791928a3b..f3aa26b9530 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.IntStream; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -50,7 +51,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncRegionLocator { +public class TestAsyncNonMetaRegionLocator { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -60,7 +61,7 @@ public class TestAsyncRegionLocator { private static AsyncConnectionImpl CONN; - private static AsyncRegionLocator LOCATOR; + private static AsyncNonMetaRegionLocator LOCATOR; private static byte[][] SPLIT_KEYS; @@ -69,7 +70,7 @@ public class TestAsyncRegionLocator { TEST_UTIL.startMiniCluster(3); TEST_UTIL.getAdmin().setBalancerRunning(false, true); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); - LOCATOR = CONN.getLocator(); + LOCATOR = new AsyncNonMetaRegionLocator(CONN); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); @@ -78,7 +79,7 @@ public class TestAsyncRegionLocator { @AfterClass public static void tearDown() throws Exception { - CONN.close(); + IOUtils.closeQuietly(CONN); TEST_UTIL.shutdownMiniCluster(); } @@ -102,12 +103,12 @@ public class TestAsyncRegionLocator { @Test public void testNoTable() throws InterruptedException { try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get(); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } try { - LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get(); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -118,12 +119,12 @@ public class TestAsyncRegionLocator { createSingleRegionTable(); TEST_UTIL.getAdmin().disableTable(TABLE_NAME); try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get(); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } try { - LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW, 0L).get(); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_END_ROW).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -143,17 +144,17 @@ public class TestAsyncRegionLocator { createSingleRegionTable(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)]; ThreadLocalRandom.current().nextBytes(randKey); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, randKey, 0L).get()); + LOCATOR.getRegionLocation(TABLE_NAME, randKey).get()); // Use a key which is not the endKey of a region will cause error try { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }, 0L).get()); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, new byte[] { 1 }).get()); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(IOException.class)); assertTrue(e.getCause().getMessage().contains("end key of")); @@ -193,7 +194,7 @@ public class TestAsyncRegionLocator { IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { try { assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], - serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], 0L).get()); + serverNames[i], LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i]).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -204,7 +205,7 @@ public class TestAsyncRegionLocator { n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> { try { assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i], - LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i], 0L).get()); + LOCATOR.getPreviousRegionLocation(TABLE_NAME, endKeys[i]).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -215,7 +216,7 @@ public class TestAsyncRegionLocator { public void testRegionMove() throws IOException, InterruptedException, ExecutionException { createSingleRegionTable(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); - HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get(); + HRegionLocation loc = LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get(); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc); ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) @@ -228,12 +229,12 @@ public class TestAsyncRegionLocator { Thread.sleep(100); } // Should be same as it is in cache - assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); + assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); LOCATOR.updateCachedLocation(loc, null); // null error will not trigger a cache cleanup - assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); + assertSame(loc, LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); LOCATOR.updateCachedLocation(loc, new NotServingRegionException()); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, 0L).get()); + LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW).get()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java new file mode 100644 index 00000000000..e82703b87f5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static AsyncConnectionImpl CONN; + + private static AsyncNonMetaRegionLocator LOCATOR; + + private static byte[][] SPLIT_KEYS; + + private static int MAX_ALLOWED = 2; + + private static AtomicInteger CONCURRENCY = new AtomicInteger(0); + + private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0); + + public static final class CountingRegionObserver extends BaseRegionObserver { + + @Override + public RegionScanner preScannerOpen(ObserverContext e, Scan scan, + RegionScanner s) throws IOException { + if (e.getEnvironment().getRegionInfo().isMetaTable()) { + int concurrency = CONCURRENCY.incrementAndGet(); + for (;;) { + int max = MAX_CONCURRENCY.get(); + if (concurrency <= max) { + break; + } + if (MAX_CONCURRENCY.compareAndSet(max, concurrency)) { + break; + } + } + Threads.sleepWithoutInterrupt(10); + } + return s; + } + + @Override + public void postScannerClose(ObserverContext e, InternalScanner s) + throws IOException { + if (e.getEnvironment().getRegionInfo().isMetaTable()) { + CONCURRENCY.decrementAndGet(); + } + } + } + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName()); + conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED); + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); + CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); + LOCATOR = new AsyncNonMetaRegionLocator(CONN); + SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) + .toArray(byte[][]::new); + TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + IOUtils.closeQuietly(CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + private void assertLocs(List> futures) + throws InterruptedException, ExecutionException { + assertEquals(256, futures.size()); + for (int i = 0; i < futures.size(); i++) { + HRegionLocation loc = futures.get(i).get(); + if (i == 0) { + assertTrue(isEmptyStartRow(loc.getRegionInfo().getStartKey())); + } else { + assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegionInfo().getStartKey())); + } + if (i == futures.size() - 1) { + assertTrue(isEmptyStopRow(loc.getRegionInfo().getEndKey())); + } else { + assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegionInfo().getEndKey())); + } + } + } + + @Test + public void test() throws InterruptedException, ExecutionException { + List> futures = IntStream.range(0, 128) + .mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) + .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r)).collect(toCollection(ArrayList::new)); + futures.addAll(IntStream.range(129, 257) + .mapToObj(i -> i < 256 ? Bytes.toBytes(String.format("%02x", i)) : EMPTY_START_ROW) + .map(r -> LOCATOR.getPreviousRegionLocation(TABLE_NAME, r)).collect(toList())); + assertLocs(futures); + assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java index 2a902a6994d..40190cbf0aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; @@ -90,7 +91,7 @@ public class TestAsyncRegionLocatorTimeout { @AfterClass public static void tearDown() throws Exception { - CONN.close(); + IOUtils.closeQuietly(CONN); TEST_UTIL.shutdownMiniCluster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 7a8572778b0..bb6cc2d34bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.IntStream; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -103,7 +104,7 @@ public class TestAsyncTable { @AfterClass public static void tearDownAfterClass() throws Exception { - ASYNC_CONN.close(); + IOUtils.closeQuietly(ASYNC_CONN); TEST_UTIL.shutdownMiniCluster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index c8e1c7ac679..ea999f90e77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.concurrent.ExecutionException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.security.User; @@ -88,7 +89,7 @@ public class TestAsyncTableNoncedRetry { @AfterClass public static void tearDownAfterClass() throws Exception { - ASYNC_CONN.close(); + IOUtils.closeQuietly(ASYNC_CONN); TEST_UTIL.shutdownMiniCluster(); }