HBASE-26119 Polish TestAsyncNonMetaRegionLocator (#3526)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2021-07-26 16:57:49 +08:00 committed by GitHub
parent 8ae394285a
commit f0324a7516
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 49 additions and 58 deletions

View File

@ -53,14 +53,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.runners.Parameterized.Parameter;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@ -72,42 +72,37 @@ public class TestAsyncNonMetaRegionLocator {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static final TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static final int META_STOREFILE_REFRESH_PERIOD = 100;
private static final int NB_SERVERS = 4;
private static int numOfMetaReplica = NB_SERVERS - 1;
private static AsyncConnectionImpl CONN;
private static AsyncNonMetaRegionLocator LOCATOR;
private static ConnectionRegistry registry;
private static final int NUM_OF_META_REPLICA = NB_SERVERS - 1;
private static byte[][] SPLIT_KEYS;
private CatalogReplicaMode metaReplicaMode;
private AsyncConnectionImpl conn;
private AsyncNonMetaRegionLocator locator;
@Parameter
public CatalogReplicaMode metaReplicaMode;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Enable hbase:meta replication.
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
conf.setLong("replication.source.sleepforretries", 10); // 10 ms
conf.setLong("replication.source.sleepforretries", 10); // 10 ms
TEST_UTIL.startMiniCluster(NB_SERVERS);
Admin admin = TEST_UTIL.getAdmin();
admin.balancerSwitch(false, true);
// Enable hbase:meta replication.
HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica);
TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions(
TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, NUM_OF_META_REPLICA);
TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster()
.getRegions(TableName.META_TABLE_NAME).size() >= NUM_OF_META_REPLICA);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
@ -117,41 +112,37 @@ public class TestAsyncNonMetaRegionLocator {
@AfterClass
public static void tearDown() throws Exception {
Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUpBeforeTest() throws InterruptedException, ExecutionException, IOException {
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
// Enable meta replica LoadBalance mode for this connection.
c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString());
ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
conn =
new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), null, User.getCurrent());
locator = new AsyncNonMetaRegionLocator(conn);
}
@After
public void tearDownAfterTest() throws IOException {
Admin admin = TEST_UTIL.getAdmin();
if (admin.tableExists(TABLE_NAME)) {
if (admin.isTableEnabled(TABLE_NAME)) {
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
admin.disableTable(TABLE_NAME);
}
TEST_UTIL.getAdmin().deleteTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
}
LOCATOR.clearCache(TABLE_NAME);
Closeables.close(conn, true);
}
@Parameterized.Parameters
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{ null },
{ CatalogReplicaMode.LOAD_BALANCE.toString() }
});
}
public TestAsyncNonMetaRegionLocator(String clientMetaReplicaMode) throws Exception {
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
// Enable meta replica LoadBalance mode for this connection.
if (clientMetaReplicaMode != null) {
c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode);
metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode);
}
CONN = new AsyncConnectionImpl(c, registry,
registry.getClusterId().get(), null, User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
return Arrays
.asList(new Object[][] { { CatalogReplicaMode.NONE }, { CatalogReplicaMode.LOAD_BALANCE } });
}
private void createSingleRegionTable() throws IOException, InterruptedException {
@ -160,8 +151,8 @@ public class TestAsyncNonMetaRegionLocator {
}
private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
byte[] row, RegionLocateType locateType, boolean reload) {
return LOCATOR
byte[] row, RegionLocateType locateType, boolean reload) {
return locator
.getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
.thenApply(RegionLocations::getDefaultRegionLocation);
}
@ -191,7 +182,7 @@ public class TestAsyncNonMetaRegionLocator {
}
private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
HRegionLocation loc) {
HRegionLocation loc) {
RegionInfo info = loc.getRegion();
assertEquals(TABLE_NAME, info.getTable());
assertArrayEquals(startKey, info.getStartKey());
@ -261,7 +252,7 @@ public class TestAsyncNonMetaRegionLocator {
}
}));
LOCATOR.clearCache(TABLE_NAME);
locator.clearCache(TABLE_NAME);
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
@ -272,7 +263,7 @@ public class TestAsyncNonMetaRegionLocator {
}
}));
LOCATOR.clearCache(TABLE_NAME);
locator.clearCache(TABLE_NAME);
byte[][] endKeys = getEndKeys();
IntStream.range(0, 2).forEach(
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
@ -304,11 +295,11 @@ public class TestAsyncNonMetaRegionLocator {
// Should be same as it is in cache
assertSame(loc,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocationOnError(loc, null);
locator.updateCachedLocationOnError(loc, null);
// null error will not trigger a cache cleanup
assertSame(loc,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException());
locator.updateCachedLocationOnError(loc, new NotServingRegionException());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
}
@ -347,7 +338,7 @@ public class TestAsyncNonMetaRegionLocator {
byte[][] endKeys = getEndKeys();
ServerName[] serverNames = getLocations(startKeys);
for (int i = 0; i < 100; i++) {
LOCATOR.clearCache(TABLE_NAME);
locator.clearCache(TABLE_NAME);
List<CompletableFuture<HRegionLocation>> futures =
IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
.map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
@ -397,8 +388,9 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.waitFor(3000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
HRegionLocation loc = getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW,
RegionLocateType.CURRENT, true).get();
HRegionLocation loc =
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true)
.get();
return newServerName.equals(loc.getServerName());
}
@ -418,7 +410,7 @@ public class TestAsyncNonMetaRegionLocator {
// Testcase for HBASE-20822
@Test
public void testLocateBeforeLastRegion()
throws IOException, InterruptedException, ExecutionException {
throws IOException, InterruptedException, ExecutionException {
createMultiRegionTable();
getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
HRegionLocation loc =
@ -436,14 +428,14 @@ public class TestAsyncNonMetaRegionLocator {
@Override
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error);
throws Exception {
locator.updateCachedLocationOnError(loc, error);
}
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
throws Exception {
return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
throws Exception {
return locator.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
RegionLocateType.CURRENT, reload).get();
}
});
@ -464,9 +456,8 @@ public class TestAsyncNonMetaRegionLocator {
public void testConcurrentUpdateCachedLocationOnError() throws Exception {
createSingleRegionTable();
HRegionLocation loc =
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false)
.get();
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
IntStream.range(0, 100).parallel()
.forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()));
.forEach(i -> locator.updateCachedLocationOnError(loc, new NotServingRegionException()));
}
}