HBASE-21726 Add getAllRegionLocations method to AsyncTableRegionLocator
This commit is contained in:
parent
07484db1ff
commit
ff272e8683
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -39,7 +40,7 @@ public interface AsyncTableRegionLocator {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds the region on which the given row is being served. Does not reload the cache.
|
* Finds the region on which the given row is being served. Does not reload the cache.
|
||||||
* <p>
|
* <p/>
|
||||||
* Returns the location of the region to which the row belongs.
|
* Returns the location of the region to which the row belongs.
|
||||||
* @param row Row to find.
|
* @param row Row to find.
|
||||||
*/
|
*/
|
||||||
|
@ -49,7 +50,7 @@ public interface AsyncTableRegionLocator {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds the region on which the given row is being served.
|
* Finds the region on which the given row is being served.
|
||||||
* <p>
|
* <p/>
|
||||||
* Returns the location of the region to which the row belongs.
|
* Returns the location of the region to which the row belongs.
|
||||||
* @param row Row to find.
|
* @param row Row to find.
|
||||||
* @param reload true to reload information or false to use cached information
|
* @param reload true to reload information or false to use cached information
|
||||||
|
@ -60,7 +61,7 @@ public interface AsyncTableRegionLocator {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
|
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
|
||||||
* <p>
|
* <p/>
|
||||||
* Returns the location of the region with the given <code>replicaId</code> to which the row
|
* Returns the location of the region with the given <code>replicaId</code> to which the row
|
||||||
* belongs.
|
* belongs.
|
||||||
* @param row Row to find.
|
* @param row Row to find.
|
||||||
|
@ -72,7 +73,7 @@ public interface AsyncTableRegionLocator {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
|
* Finds the region with the given <code>replicaId</code> on which the given row is being served.
|
||||||
* <p>
|
* <p/>
|
||||||
* Returns the location of the region with the given <code>replicaId</code> to which the row
|
* Returns the location of the region with the given <code>replicaId</code> to which the row
|
||||||
* belongs.
|
* belongs.
|
||||||
* @param row Row to find.
|
* @param row Row to find.
|
||||||
|
@ -80,4 +81,10 @@ public interface AsyncTableRegionLocator {
|
||||||
* @param reload true to reload information or false to use cached information
|
* @param reload true to reload information or false to use cached information
|
||||||
*/
|
*/
|
||||||
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
|
CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int replicaId, boolean reload);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves all of the regions associated with this table.
|
||||||
|
* @return a {@link List} of all regions associated with this table.
|
||||||
|
*/
|
||||||
|
CompletableFuture<List<HRegionLocation>> getAllRegionLocations();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,12 +17,20 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The implementation of AsyncRegionLocator.
|
* The implementation of AsyncRegionLocator.
|
||||||
*/
|
*/
|
||||||
|
@ -49,4 +57,64 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
|
||||||
return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
|
return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload,
|
||||||
-1L);
|
-1L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this is used to prevent stack overflow if there are thousands of regions for the table. If the
|
||||||
|
// location is in cache, the CompletableFuture will be completed immediately inside the same
|
||||||
|
// thread, and then in the action we will call locate again, also in the same thread. If all the
|
||||||
|
// locations are in cache, and we do not use whenCompleteAsync to break the tie, the stack will be
|
||||||
|
// very very deep and cause stack overflow.
|
||||||
|
@VisibleForTesting
|
||||||
|
static final ThreadLocal<MutableInt> STACK_DEPTH = new ThreadLocal<MutableInt>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MutableInt initialValue() {
|
||||||
|
return new MutableInt(0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int MAX_STACK_DEPTH = 16;
|
||||||
|
|
||||||
|
private void locate(CompletableFuture<List<HRegionLocation>> future,
|
||||||
|
ConcurrentLinkedQueue<HRegionLocation> result, byte[] row) {
|
||||||
|
BiConsumer<HRegionLocation, Throwable> listener = (loc, error) -> {
|
||||||
|
if (error != null) {
|
||||||
|
future.completeExceptionally(error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
result.add(loc);
|
||||||
|
if (ConnectionUtils.isEmptyStartRow(loc.getRegion().getStartKey())) {
|
||||||
|
future.complete(result.stream()
|
||||||
|
.sorted((l1, l2) -> RegionInfo.COMPARATOR.compare(l1.getRegion(), l2.getRegion()))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
} else {
|
||||||
|
locate(future, result, loc.getRegion().getStartKey());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
MutableInt depth = STACK_DEPTH.get();
|
||||||
|
boolean async = depth.incrementAndGet() >= MAX_STACK_DEPTH;
|
||||||
|
try {
|
||||||
|
CompletableFuture<HRegionLocation> f =
|
||||||
|
locator.getRegionLocation(tableName, row, RegionLocateType.BEFORE, -1L);
|
||||||
|
if (async) {
|
||||||
|
FutureUtils.addListenerAsync(f, listener);
|
||||||
|
} else {
|
||||||
|
FutureUtils.addListener(f, listener);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (depth.decrementAndGet() == 0) {
|
||||||
|
STACK_DEPTH.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
|
||||||
|
ConcurrentLinkedQueue<HRegionLocation> result = new ConcurrentLinkedQueue<>();
|
||||||
|
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
|
||||||
|
// start from end to start, as when locating we will do reverse scan, so we will prefetch the
|
||||||
|
// location of the regions before the current one.
|
||||||
|
locate(future, result, HConstants.EMPTY_END_ROW);
|
||||||
|
return future;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,4 +57,23 @@ public final class FutureUtils {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Almost the same with the {@link #addListener(CompletableFuture, BiConsumer)} method above, the
|
||||||
|
* difference is that in this method we will call
|
||||||
|
* {@link CompletableFuture#whenCompleteAsync(BiConsumer)} instead of
|
||||||
|
* {@link CompletableFuture#whenComplete(BiConsumer)}.
|
||||||
|
* @see #addListener(CompletableFuture, BiConsumer)
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("FutureReturnValueIgnored")
|
||||||
|
public static <T> void addListenerAsync(CompletableFuture<T> future,
|
||||||
|
BiConsumer<? super T, ? super Throwable> action) {
|
||||||
|
future.whenCompleteAsync((resp, error) -> {
|
||||||
|
try {
|
||||||
|
action.accept(resp, error);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Unexpected error caught when processing CompletableFuture", t);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,118 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
|
import org.apache.commons.math3.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestAsyncTableRegionLocator {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestAsyncTableRegionLocator.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||||
|
|
||||||
|
private static AsyncConnection CONN;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
TEST_UTIL.createMultiRegionTable(TABLE_NAME, Bytes.toBytes("cf"));
|
||||||
|
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||||
|
TEST_UTIL.getAdmin().balancerSwitch(false, true);
|
||||||
|
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
Closeables.close(CONN, true);
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertLocEquals(Map<RegionInfo, ServerName> region2Loc)
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
for (HRegionLocation loc : CONN.getRegionLocator(TABLE_NAME).getAllRegionLocations().get()) {
|
||||||
|
ServerName expected = region2Loc.remove(loc.getRegion());
|
||||||
|
assertNotNull(expected);
|
||||||
|
assertEquals(expected, loc.getServerName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAll() throws InterruptedException, ExecutionException {
|
||||||
|
Map<RegionInfo, ServerName> region2Loc = TEST_UTIL.getMiniHBaseCluster()
|
||||||
|
.getRegionServerThreads().stream().map(t -> t.getRegionServer())
|
||||||
|
.flatMap(rs -> rs.getRegions(TABLE_NAME).stream()
|
||||||
|
.map(r -> Pair.create(r.getRegionInfo(), rs.getServerName())))
|
||||||
|
.collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
|
||||||
|
MutableInt maxDepth = new MutableInt(0);
|
||||||
|
MutableInt depth = new MutableInt(0) {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 5887112211305087650L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int incrementAndGet() {
|
||||||
|
int val = super.incrementAndGet();
|
||||||
|
if (val > maxDepth.intValue()) {
|
||||||
|
maxDepth.setValue(val);
|
||||||
|
}
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// first time, read from meta
|
||||||
|
AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
|
||||||
|
assertLocEquals(new HashMap<>(region2Loc));
|
||||||
|
assertTrue(maxDepth.intValue() > 0);
|
||||||
|
assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
|
||||||
|
|
||||||
|
// second time, read from cache
|
||||||
|
maxDepth.setValue(0);
|
||||||
|
depth.setValue(0);
|
||||||
|
AsyncTableRegionLocatorImpl.STACK_DEPTH.set(depth);
|
||||||
|
assertLocEquals(new HashMap<>(region2Loc));
|
||||||
|
assertTrue(maxDepth.intValue() > 0);
|
||||||
|
assertTrue(maxDepth.intValue() <= AsyncTableRegionLocatorImpl.MAX_STACK_DEPTH);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue