HBASE-21726 Add getAllRegionLocations method to AsyncTableRegionLocator

This commit is contained in:
Duo Zhang 2019-01-17 17:42:00 +08:00 committed by zhangduo
parent 3d23490e88
commit 882bd564f4
4 changed files with 217 additions and 5 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@ -39,7 +40,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region on which the given row is being served. Does not reload the cache.
* <p>
* <p/>
* Returns the location of the region to which the row belongs.
* @param row Row to find.
*/
@ -49,7 +50,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region on which the given row is being served.
* <p>
* <p/>
* Returns the location of the region to which the row belongs.
* @param row Row to find.
* @param reload true to reload information or false to use cached information
@ -60,7 +61,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
* <p>
* <p/>
* Returns the location of the region with the given <code>replicaId</code> to which the row
* belongs.
* @param row Row to find.
@ -72,7 +73,7 @@ public interface AsyncTableRegionLocator {
/**
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
* <p>
* <p/>
* Returns the location of the region with the given <code>replicaId</code> to which the row
* belongs.
* @param row Row to find.
@ -80,4 +81,10 @@ public interface AsyncTableRegionLocator {
* @param reload true to reload information or false to use cached information
*/
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
/**
* Retrieves all of the regions associated with this table.
* @return a {@link List} of all regions associated with this table.
*/
CompletableFuture<List<HRegionLocation>> getAllRegionLocations();
}

View File

@ -17,12 +17,20 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* The implementation of AsyncRegionLocator.
*/
@ -49,4 +57,64 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
-1L);
}
// this is used to prevent stack overflow if there are thousands of regions for the table. If the
// location is in cache, the CompletableFuture will be completed immediately inside the same
// thread, and then in the action we will call locate again, also in the same thread. If all the
// locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be
// very very deep and cause stack overflow.
@VisibleForTesting
static final ThreadLocal<MutableInt> STACK_DEPTH = new ThreadLocal<MutableInt>() {
@Override
protected MutableInt initialValue() {
return new MutableInt(0);
}
};
@VisibleForTesting
static final int MAX_STACK_DEPTH = 16;
private void locate(CompletableFuture<List<HRegionLocation>> future,
ConcurrentLinkedQueue<HRegionLocation> result, byte[] row) {
BiConsumer<HRegionLocation, Throwable> listener = (loc, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
result.add(loc);
if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) {
future.complete(result.stream()
.sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion()))
.collect(Collectors.toList()));
} else {
locate(future, result, loc.getRegion().getStartKey());
}
};
MutableInt depth = STACK_DEPTH.get();
boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH;
try {
CompletableFuture<HRegionLocation> f =
locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L);
if (async) {
FutureUtils.addListenerAsync(f, listener);
} else {
FutureUtils.addListener(f, listener);
}
} finally {
if (depth.decrementAndGet() == 0) {
STACK_DEPTH.remove();
}
}
}
@Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
ConcurrentLinkedQueue<HRegionLocation> result = new ConcurrentLinkedQueue<>();
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
// start from end to start, as when locating we will do reverse scan, so we will prefetch the
// location of the regions before the current one.
locate(future, result, HConstants.EMPTY_END_ROW);
return future;
}
}

View File

@ -57,4 +57,23 @@ public final class FutureUtils {
}
});
}
/**
* Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the
* difference is that in this method we will call
* {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of
* {@link CompletableFuture#whenComplete(BiConsumer)}.
* @see #addListener(CompletableFuture, BiConsumer)
*/
@SuppressWarnings("FutureReturnValueIgnored")
public static <T> void addListenerAsync(CompletableFuture<T> future,
BiConsumer<? super T, ? super Throwable> action) {
future.whenCompleteAsync((resp, error) -> {
try {
action.accept(resp, error);
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
});
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableRegionLocator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static AsyncConnection CONN;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("cf"));
TEST_UTIL.waitTableAvailable(TABLE_NAME);
TEST_UTIL.getAdmin().balancerSwitch(false, true);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@AfterClass
public static void tearDown() throws Exception {
Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
private void assertLocEquals(Map<RegionInfo, ServerName> region2Loc)
throws InterruptedException, ExecutionException {
for (HRegionLocation loc : CONN.getRegionLocator(TABLE_NAME).getAllRegionLocations().get()) {
ServerName expected = region2Loc.remove(loc.getRegion());
assertNotNull(expected);
assertEquals(expected, loc.getServerName());
}
}
@Test
public void testGetAll() throws InterruptedException, ExecutionException {
Map<RegionInfo, ServerName> region2Loc = TEST_UTIL.getMiniHBaseCluster()
.getRegionServerThreads().stream().map(t -> t.getRegionServer())
.flatMap(rs -> rs.getRegions(TABLE_NAME).stream()
.map(r -> Pair.create(r.getRegionInfo(), rs.getServerName())))
.collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
MutableInt maxDepth = new MutableInt(0);
MutableInt depth = new MutableInt(0) {
private static final long serialVersionUID = 5887112211305087650L;
@Override
public int incrementAndGet() {
int val = super.incrementAndGet();
if (val > maxDepth.intValue()) {
maxDepth.setValue(val);
}
return val;
}
};
// first time, read from meta
AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
assertLocEquals(new HashMap<>(region2Loc));
assertTrue(maxDepth.intValue() > 0);
assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
// second time, read from cache
maxDepth.setValue(0);
depth.setValue(0);
AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
assertLocEquals(new HashMap<>(region2Loc));
assertTrue(maxDepth.intValue() > 0);
assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
}
}