HBASE-26119 Polish TestAsyncNonMetaRegionLocator (#3526)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2021-07-26 16:57:49 +08:00
parent 59e2825c93
commit be0a6cea40
1 changed files with 49 additions and 57 deletions

View File

@ -31,7 +31,6 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -54,14 +53,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.slf4j.Logger; import org.junit.runners.Parameterized.Parameter;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@ -73,24 +72,21 @@ public class TestAsyncNonMetaRegionLocator {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class); HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async"); private static final TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf"); private static byte[] FAMILY = Bytes.toBytes("cf");
private static final int META_STOREFILE_REFRESH_PERIOD = 100;
private static final int NB_SERVERS = 4; private static final int NB_SERVERS = 4;
private static int numOfMetaReplica = NB_SERVERS - 1; private static final int NUM_OF_META_REPLICA = NB_SERVERS - 1;
private static AsyncConnectionImpl CONN;
private static AsyncNonMetaRegionLocator LOCATOR;
private static ConnectionRegistry registry;
private static byte[][] SPLIT_KEYS; private static byte[][] SPLIT_KEYS;
private CatalogReplicaMode metaReplicaMode;
private AsyncConnectionImpl conn;
private AsyncNonMetaRegionLocator locator;
@Parameter
public CatalogReplicaMode metaReplicaMode;
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
@ -98,16 +94,16 @@ public class TestAsyncNonMetaRegionLocator {
// Enable hbase:meta replication. // Enable hbase:meta replication.
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
conf.setLong("replication.source.sleepforretries", 10); // 10 ms conf.setLong("replication.source.sleepforretries", 10); // 10 ms
TEST_UTIL.startMiniCluster(NB_SERVERS); TEST_UTIL.startMiniCluster(NB_SERVERS);
Admin admin = TEST_UTIL.getAdmin(); Admin admin = TEST_UTIL.getAdmin();
admin.balancerSwitch(false, true); admin.balancerSwitch(false, true);
// Enable hbase:meta replication. // Enable hbase:meta replication.
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica); HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, NUM_OF_META_REPLICA);
TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions( TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster()
TableName.META_TABLE_NAME).size() >= numOfMetaReplica); .getRegions(TableName.META_TABLE_NAME).size() >= NUM_OF_META_REPLICA);
registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
SPLIT_KEYS = new byte[8][]; SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) { for (int i = 111; i < 999; i += 111) {
SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
@ -116,40 +112,36 @@ public class TestAsyncNonMetaRegionLocator {
@AfterClass @AfterClass
public static void tearDown() throws Exception { public static void tearDown() throws Exception {
Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
@Before
public void setUpBeforeTest() throws InterruptedException, ExecutionException, IOException {
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
// Enable meta replica LoadBalance mode for this connection.
c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString());
ConnectionRegistry registry =
ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
conn = new AsyncConnectionImpl(c, registry, null, User.getCurrent());
locator = new AsyncNonMetaRegionLocator(conn);
}
@After @After
public void tearDownAfterTest() throws IOException { public void tearDownAfterTest() throws IOException {
Admin admin = TEST_UTIL.getAdmin(); Admin admin = TEST_UTIL.getAdmin();
if (admin.tableExists(TABLE_NAME)) { if (admin.tableExists(TABLE_NAME)) {
if (admin.isTableEnabled(TABLE_NAME)) { if (admin.isTableEnabled(TABLE_NAME)) {
TEST_UTIL.getAdmin().disableTable(TABLE_NAME); admin.disableTable(TABLE_NAME);
} }
TEST_UTIL.getAdmin().deleteTable(TABLE_NAME); admin.deleteTable(TABLE_NAME);
} }
LOCATOR.clearCache(TABLE_NAME); Closeables.close(conn, true);
} }
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> parameters() { public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] { return Arrays
{ null }, .asList(new Object[][] { { CatalogReplicaMode.NONE }, { CatalogReplicaMode.LOAD_BALANCE } });
{ CatalogReplicaMode.LOAD_BALANCE.toString() }
});
}
public TestAsyncNonMetaRegionLocator(String clientMetaReplicaMode) throws Exception {
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
// Enable meta replica LoadBalance mode for this connection.
if (clientMetaReplicaMode != null) {
c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode);
metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode);
}
CONN = new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), User.getCurrent());
LOCATOR = new AsyncNonMetaRegionLocator(CONN);
} }
private void createSingleRegionTable() throws IOException, InterruptedException { private void createSingleRegionTable() throws IOException, InterruptedException {
@ -158,8 +150,8 @@ public class TestAsyncNonMetaRegionLocator {
} }
private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName, private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName,
byte[] row, RegionLocateType locateType, boolean reload) { byte[] row, RegionLocateType locateType, boolean reload) {
return LOCATOR return locator
.getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload) .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload)
.thenApply(RegionLocations::getDefaultRegionLocation); .thenApply(RegionLocations::getDefaultRegionLocation);
} }
@ -189,7 +181,7 @@ public class TestAsyncNonMetaRegionLocator {
} }
private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName, private void assertLocEquals(byte[] startKey, byte[] endKey, ServerName serverName,
HRegionLocation loc) { HRegionLocation loc) {
RegionInfo info = loc.getRegion(); RegionInfo info = loc.getRegion();
assertEquals(TABLE_NAME, info.getTable()); assertEquals(TABLE_NAME, info.getTable());
assertArrayEquals(startKey, info.getStartKey()); assertArrayEquals(startKey, info.getStartKey());
@ -259,7 +251,7 @@ public class TestAsyncNonMetaRegionLocator {
} }
})); }));
LOCATOR.clearCache(TABLE_NAME); locator.clearCache(TABLE_NAME);
IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> {
try { try {
assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1],
@ -270,7 +262,7 @@ public class TestAsyncNonMetaRegionLocator {
} }
})); }));
LOCATOR.clearCache(TABLE_NAME); locator.clearCache(TABLE_NAME);
byte[][] endKeys = getEndKeys(); byte[][] endKeys = getEndKeys();
IntStream.range(0, 2).forEach( IntStream.range(0, 2).forEach(
n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> { n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> {
@ -302,11 +294,11 @@ public class TestAsyncNonMetaRegionLocator {
// Should be same as it is in cache // Should be same as it is in cache
assertSame(loc, assertSame(loc,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocationOnError(loc, null); locator.updateCachedLocationOnError(loc, null);
// null error will not trigger a cache cleanup // null error will not trigger a cache cleanup
assertSame(loc, assertSame(loc,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()); locator.updateCachedLocationOnError(loc, new NotServingRegionException());
assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get());
} }
@ -345,7 +337,7 @@ public class TestAsyncNonMetaRegionLocator {
byte[][] endKeys = getEndKeys(); byte[][] endKeys = getEndKeys();
ServerName[] serverNames = getLocations(startKeys); ServerName[] serverNames = getLocations(startKeys);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
LOCATOR.clearCache(TABLE_NAME); locator.clearCache(TABLE_NAME);
List<CompletableFuture<HRegionLocation>> futures = List<CompletableFuture<HRegionLocation>> futures =
IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s)) IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s))
.map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false)) .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false))
@ -395,8 +387,9 @@ public class TestAsyncNonMetaRegionLocator {
TEST_UTIL.waitFor(3000, new ExplainingPredicate<Exception>() { TEST_UTIL.waitFor(3000, new ExplainingPredicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
HRegionLocation loc = getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, HRegionLocation loc =
RegionLocateType.CURRENT, true).get(); getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true)
.get();
return newServerName.equals(loc.getServerName()); return newServerName.equals(loc.getServerName());
} }
@ -416,7 +409,7 @@ public class TestAsyncNonMetaRegionLocator {
// Testcase for HBASE-20822 // Testcase for HBASE-20822
@Test @Test
public void testLocateBeforeLastRegion() public void testLocateBeforeLastRegion()
throws IOException, InterruptedException, ExecutionException { throws IOException, InterruptedException, ExecutionException {
createMultiRegionTable(); createMultiRegionTable();
getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join(); getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join();
HRegionLocation loc = HRegionLocation loc =
@ -434,14 +427,14 @@ public class TestAsyncNonMetaRegionLocator {
@Override @Override
public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) public void updateCachedLocationOnError(HRegionLocation loc, Throwable error)
throws Exception { throws Exception {
LOCATOR.updateCachedLocationOnError(loc, error); locator.updateCachedLocationOnError(loc, error);
} }
@Override @Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload)
throws Exception { throws Exception {
return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId, return locator.getRegionLocations(tableName, EMPTY_START_ROW, replicaId,
RegionLocateType.CURRENT, reload).get(); RegionLocateType.CURRENT, reload).get();
} }
}); });
@ -462,9 +455,8 @@ public class TestAsyncNonMetaRegionLocator {
public void testConcurrentUpdateCachedLocationOnError() throws Exception { public void testConcurrentUpdateCachedLocationOnError() throws Exception {
createSingleRegionTable(); createSingleRegionTable();
HRegionLocation loc = HRegionLocation loc =
getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false) getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get();
.get();
IntStream.range(0, 100).parallel() IntStream.range(0, 100).parallel()
.forEach(i -> LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException())); .forEach(i -> locator.updateCachedLocationOnError(loc, new NotServingRegionException()));
} }
} }