HBASE-18348 The implementation of AsyncTableRegionLocator does not follow the javadoc

This commit is contained in:
zhangduo 2017-07-10 15:47:25 +08:00
parent 43492d2d3b
commit f8e892d7aa
7 changed files with 203 additions and 44 deletions

View File

@ -45,11 +45,13 @@ class AsyncMetaRegionLocator {
this.registry = registry;
}
CompletableFuture<HRegionLocation> getRegionLocation() {
CompletableFuture<HRegionLocation> getRegionLocation(boolean reload) {
for (;;) {
HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
if (metaRegionLocation != null) {
return CompletableFuture.completedFuture(metaRegionLocation);
if (!reload) {
HRegionLocation metaRegionLocation = this.metaRegionLocation.get();
if (metaRegionLocation != null) {
return CompletableFuture.completedFuture(metaRegionLocation);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Meta region location cache is null, try fetching from registry.");

View File

@ -392,22 +392,26 @@ class AsyncNonMetaRegionLocator {
// placed before it. Used for reverse scan. See the comment of
// AsyncRegionLocator.getPreviousRegionLocation.
private CompletableFuture<HRegionLocation> getRegionLocationInternal(TableName tableName,
byte[] row, RegionLocateType locateType) {
byte[] row, RegionLocateType locateType, boolean reload) {
// AFTER should be convert to CURRENT before calling this method
assert !locateType.equals(RegionLocateType.AFTER);
TableCache tableCache = getTableCache(tableName);
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) {
return CompletableFuture.completedFuture(loc);
if (!reload) {
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) {
return CompletableFuture.completedFuture(loc);
}
}
CompletableFuture<HRegionLocation> future;
LocateRequest req;
boolean sendRequest = false;
synchronized (tableCache) {
// check again
loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) {
return CompletableFuture.completedFuture(loc);
if (!reload) {
HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType);
if (loc != null) {
return CompletableFuture.completedFuture(loc);
}
}
req = new LocateRequest(row, locateType);
future = tableCache.allRequests.get(req);
@ -427,16 +431,16 @@ class AsyncNonMetaRegionLocator {
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType locateType) {
RegionLocateType locateType, boolean reload) {
if (locateType.equals(RegionLocateType.BEFORE)) {
return getRegionLocationInternal(tableName, row, locateType);
return getRegionLocationInternal(tableName, row, locateType, reload);
} else {
// as we know the exact row after us, so we can just create the new row, and use the same
// algorithm to locate it.
if (locateType.equals(RegionLocateType.AFTER)) {
row = createClosestRowAfter(row);
}
return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT);
return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload);
}
}

View File

@ -79,23 +79,28 @@ class AsyncRegionLocator {
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType type, long timeoutNs) {
RegionLocateType type, boolean reload, long timeoutNs) {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
CompletableFuture<HRegionLocation> future =
tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation()
: nonMetaRegionLocator.getRegionLocation(tableName, row, type);
tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload)
: nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload);
return withTimeout(future, timeoutNs,
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ "ms) waiting for region location for " + tableName + ", row='"
+ Bytes.toStringBinary(row) + "'");
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType type, long timeoutNs) {
return getRegionLocation(tableName, row, type, false, timeoutNs);
}
static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) {
// Do not need to update if no such location, or the location is newer, or the location is not
// same with us
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum()
&& oldLoc.getServerName().equals(loc.getServerName());
return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() &&
oldLoc.getServerName().equals(loc.getServerName());
}
static void updateCachedLocation(HRegionLocation loc, Throwable exception,

View File

@ -45,6 +45,6 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator {
@Override
public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, boolean reload) {
return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, -1L);
return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L);
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncMetaRegionLocator {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static AsyncRegistry REGISTRY;
private static AsyncMetaRegionLocator LOCATOR;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none");
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.waitUntilAllSystemRegionsAssigned();
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
}
@AfterClass
public static void tearDown() throws Exception {
IOUtils.closeQuietly(REGISTRY);
TEST_UTIL.shutdownMiniCluster();
}
private Optional<ServerName> getRSCarryingMeta() {
return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer())
.filter(rs -> !rs.getOnlineRegions(TableName.META_TABLE_NAME).isEmpty()).findAny()
.map(rs -> rs.getServerName());
}
@Test
public void testReload() throws Exception {
ServerName serverName = getRSCarryingMeta().get();
assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
.findAny().get();
TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
Bytes.toBytes(newServerName.getServerName()));
TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Optional<ServerName> newServerName = getRSCarryingMeta();
return newServerName.isPresent() && !newServerName.get().equals(serverName);
}
@Override
public String explainFailure() throws Exception {
return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " +
serverName;
}
});
// The cached location will not change
assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName());
// should get the new location when reload = true
assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName());
// the cached location should be replaced
assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName());
}
}

View File

@ -51,6 +51,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncNonMetaRegionLocator {
@ -108,7 +109,7 @@ public class TestAsyncNonMetaRegionLocator {
public void testNoTable() throws InterruptedException {
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType).get();
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@ -121,7 +122,7 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
for (RegionLocateType locateType : RegionLocateType.values()) {
try {
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType).get();
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
}
@ -143,13 +144,13 @@ public class TestAsyncNonMetaRegionLocator {
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType).get());
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)];
ThreadLocalRandom.current().nextBytes(randKey);
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType).get());
LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get());
}
}
@ -191,8 +192,8 @@ public class TestAsyncNonMetaRegionLocator {
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
serverNames[i],
LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT).get());
serverNames[i], LOCATOR
.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@ -203,7 +204,7 @@ public class TestAsyncNonMetaRegionLocator {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
serverNames[i],
LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER).get());
LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@ -214,7 +215,8 @@ public class TestAsyncNonMetaRegionLocator {
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
try {
assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i],
LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE).get());
LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false)
.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@ -225,8 +227,8 @@ public class TestAsyncNonMetaRegionLocator {
public void testRegionMove() throws IOException, InterruptedException, ExecutionException {
createSingleRegionTable();
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
HRegionLocation loc =
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get();
HRegionLocation loc = LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc);
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
@ -239,15 +241,15 @@ public class TestAsyncNonMetaRegionLocator {
Thread.sleep(100);
}
// Should be same as it is in cache
assertSame(loc,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get());
assertSame(loc, LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocation(loc, null);
// null error will not trigger a cache cleanup
assertSame(loc,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get());
assertSame(loc, LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocation(loc, new NotServingRegionException());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT).get());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR
.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
}
// usually locate after will return the same result, so we add a test to make it return different
@ -259,12 +261,12 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey });
TEST_UTIL.waitTableAvailable(TABLE_NAME);
HRegionLocation currentLoc =
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT).get();
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get();
ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc);
HRegionLocation afterLoc =
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER).get();
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get();
ServerName afterServerName =
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.filter(rs -> rs.getOnlineRegions(TABLE_NAME).stream()
@ -272,7 +274,8 @@ public class TestAsyncNonMetaRegionLocator {
.findAny().get().getServerName();
assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc);
assertSame(afterLoc, LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER).get());
assertSame(afterLoc,
LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get());
}
// For HBASE-17402
@ -286,7 +289,7 @@ public class TestAsyncNonMetaRegionLocator {
LOCATOR.clearCache(TABLE_NAME);
List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000)
.mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT))
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
.collect(toList());
for (int j = 0; j < 1000; j++) {
int index = Math.min(8, j / 111);
@ -294,4 +297,47 @@ public class TestAsyncNonMetaRegionLocator {
}
}
}
@Test
public void testReload() throws Exception {
createSingleRegionTable();
ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName))
.findAny().get();
Admin admin = TEST_UTIL.getAdmin();
HRegionInfo region = admin.getTableRegions(TABLE_NAME).stream().findAny().get();
admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName()));
TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ServerName newServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName();
return newServerName != null && !newServerName.equals(serverName);
}
@Override
public String explainFailure() throws Exception {
return region.getRegionNameAsString() + " is still on " + serverName;
}
});
// The cached location will not change
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
// should get the new location when reload = true
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
// the cached location should be replaced
for (RegionLocateType locateType : RegionLocateType.values()) {
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
}
}
}

View File

@ -148,7 +148,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
public void test() throws InterruptedException, ExecutionException {
List<CompletableFuture<HRegionLocation>> futures =
IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i)))
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT))
.map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
.collect(toList());
assertLocs(futures);
assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED);