diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java index 3bda38e9269..f67204a547d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -39,7 +40,7 @@ public interface AsyncTableRegionLocator { /** * Finds the region on which the given row is being served. Does not reload the cache. - *

+ *

* Returns the location of the region to which the row belongs. * @param row Row to find. */ @@ -49,7 +50,7 @@ public interface AsyncTableRegionLocator { /** * Finds the region on which the given row is being served. - *

+ *

* Returns the location of the region to which the row belongs. * @param row Row to find. * @param reload true to reload information or false to use cached information @@ -60,7 +61,7 @@ public interface AsyncTableRegionLocator { /** * Finds the region with the given replicaId on which the given row is being served. - *

+ *

* Returns the location of the region with the given replicaId to which the row * belongs. * @param row Row to find. @@ -72,7 +73,7 @@ public interface AsyncTableRegionLocator { /** * Finds the region with the given replicaId on which the given row is being served. - *

+ *

* Returns the location of the region with the given replicaId to which the row * belongs. * @param row Row to find. @@ -80,4 +81,10 @@ public interface AsyncTableRegionLocator { * @param reload true to reload information or false to use cached information */ CompletableFuture getRegionLocation(byte[] row, int replicaId, boolean reload); + + /** + * Retrieves all of the regions associated with this table. + * @return a {@link List} of all regions associated with this table. + */ + CompletableFuture> getAllRegionLocations(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index 465a411f088..606ee7acef5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -17,12 +17,20 @@ */ package org.apache.hadoop.hbase.client; +import java.util.List; import java.util.concurrent.CompletableFuture; - +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + /** * The implementation of AsyncRegionLocator. */ @@ -49,4 +57,64 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload, -1L); } + + // this is used to prevent stack overflow if there are thousands of regions for the table. If the + // location is in cache, the CompletableFuture will be completed immediately inside the same + // thread, and then in the action we will call locate again, also in the same thread. If all the + // locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be + // very very deep and cause stack overflow. + @VisibleForTesting + static final ThreadLocal STACK_DEPTH = new ThreadLocal() { + + @Override + protected MutableInt initialValue() { + return new MutableInt(0); + } + }; + + @VisibleForTesting + static final int MAX_STACK_DEPTH = 16; + + private void locate(CompletableFuture> future, + ConcurrentLinkedQueue result, byte[] row) { + BiConsumer listener = (loc, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + result.add(loc); + if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) { + future.complete(result.stream() + .sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion())) + .collect(Collectors.toList())); + } else { + locate(future, result, loc.getRegion().getStartKey()); + } + }; + MutableInt depth = STACK_DEPTH.get(); + boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH; + try { + CompletableFuture f = + locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L); + if (async) { + FutureUtils.addListenerAsync(f, listener); + } else { + FutureUtils.addListener(f, listener); + } + } finally { + if (depth.decrementAndGet() == 0) { + STACK_DEPTH.remove(); + } + } + } + + @Override + public CompletableFuture> getAllRegionLocations() { + ConcurrentLinkedQueue result = new ConcurrentLinkedQueue<>(); + CompletableFuture> future = new CompletableFuture<>(); + // start from end to start, as when locating we will do reverse scan, so we will prefetch the + // location of the regions before the current one. + locate(future, result, HConstants.EMPTY_END_ROW); + return future; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 067e66b5ec4..6c3e026753b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -57,4 +57,23 @@ public final class FutureUtils { } }); } + + /** + * Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the + * difference is that in this method we will call + * {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of + * {@link CompletableFuture#whenComplete(BiConsumer)}. + * @see #addListener(CompletableFuture, BiConsumer) + */ + @SuppressWarnings("FutureReturnValueIgnored") + public static void addListenerAsync(CompletableFuture future, + BiConsumer action) { + future.whenCompleteAsync((resp, error) -> { + try { + action.accept(resp, error); + } catch (Throwable t) { + LOG.error("Unexpected error caught when processing CompletableFuture", t); + } + }); + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java new file mode 100644 index 00000000000..86520047b1c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionLocator.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.math3.util.Pair; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableRegionLocator { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableRegionLocator.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static AsyncConnection CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("cf")); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + TEST_UTIL.getAdmin().balancerSwitch(false, true); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + Closeables.close(CONN, true); + TEST_UTIL.shutdownMiniCluster(); + } + + private void assertLocEquals(Map region2Loc) + throws InterruptedException, ExecutionException { + for (HRegionLocation loc : CONN.getRegionLocator(TABLE_NAME).getAllRegionLocations().get()) { + ServerName expected = region2Loc.remove(loc.getRegion()); + assertNotNull(expected); + assertEquals(expected, loc.getServerName()); + } + } + + @Test + public void testGetAll() throws InterruptedException, ExecutionException { + Map region2Loc = TEST_UTIL.getMiniHBaseCluster() + .getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .flatMap(rs -> rs.getRegions(TABLE_NAME).stream() + .map(r -> Pair.create(r.getRegionInfo(), rs.getServerName()))) + .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); + MutableInt maxDepth = new MutableInt(0); + MutableInt depth = new MutableInt(0) { + + private static final long serialVersionUID = 5887112211305087650L; + + @Override + public int incrementAndGet() { + int val = super.incrementAndGet(); + if (val > maxDepth.intValue()) { + maxDepth.setValue(val); + } + return val; + } + }; + // first time, read from meta + AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth); + assertLocEquals(new HashMap<>(region2Loc)); + assertTrue(maxDepth.intValue() > 0); + assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH); + + // second time, read from cache + maxDepth.setValue(0); + depth.setValue(0); + AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth); + assertLocEquals(new HashMap<>(region2Loc)); + assertTrue(maxDepth.intValue() > 0); + assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH); + } +}