HBASE-21682 Support getting from specific replica

This commit is contained in:
zhangduo 2019-01-07 20:34:01 +08:00 committed by Duo Zhang
parent c28e03e5df
commit d5fff9c256
2 changed files with 51 additions and 22 deletions

View File

@ -261,12 +261,17 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> get(Get get) {
if (get.getConsistency() == Consistency.STRONG) {
return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
}
// user specifies a replica id explicitly, just send request to the specific replica
if (get.getReplicaId() >= 0) {
return get(get, get.getReplicaId(), readRpcTimeoutNs);
}
// Timeline consistent read, where we may send requests to other region replicas
CompletableFuture<Result> primaryFuture =
get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
if (get.getConsistency() == Consistency.STRONG) {
return primaryFuture;
}
// Timeline consistent read, where we will send requests to other region replicas
CompletableFuture<Result> future = new CompletableFuture<>();
connect(primaryFuture, future);
long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -75,6 +77,8 @@ public class TestAsyncTableRegionReplicasGet {
private static byte[] VALUE = Bytes.toBytes("value");
private static int REPLICA_COUNT = 3;
private static AsyncConnection ASYNC_CONN;
@Rule
@ -99,9 +103,8 @@ public class TestAsyncTableRegionReplicasGet {
private static volatile boolean FAIL_PRIMARY_GET = false;
private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0);
private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
new ConcurrentHashMap<>();
public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@ -117,13 +120,10 @@ public class TestAsyncTableRegionReplicasGet {
if (!region.getTable().equals(TABLE_NAME)) {
return;
}
if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
SECONDARY_GET_COUNT.incrementAndGet();
} else {
PRIMARY_GET_COUNT.incrementAndGet();
if (FAIL_PRIMARY_GET) {
throw new IOException("Inject error");
}
REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
.incrementAndGet();
if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
throw new IOException("Inject error");
}
}
}
@ -152,10 +152,9 @@ public class TestAsyncTableRegionReplicasGet {
// infinite retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3)
.setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
.setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
@ -172,10 +171,21 @@ public class TestAsyncTableRegionReplicasGet {
TEST_UTIL.shutdownMiniCluster();
}
private static int getSecondaryGetCount() {
return REPLICA_ID_TO_COUNT.entrySet().stream()
.filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
.mapToInt(e -> e.getValue().get()).sum();
}
private static int getPrimaryGetCount() {
AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
return primaryGetCount != null ? primaryGetCount.get() : 0;
}
@Test
public void testNoReplicaRead() throws Exception {
FAIL_PRIMARY_GET = false;
SECONDARY_GET_COUNT.set(0);
REPLICA_ID_TO_COUNT.clear();
AsyncTable<?> table = getTable.get();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
for (int i = 0; i < 1000; i++) {
@ -184,21 +194,35 @@ public class TestAsyncTableRegionReplicasGet {
// the primary region is fine and the primary timeout is 1 second which is long enough, so we
// should not send any requests to secondary replicas even if the consistency is timeline.
Thread.sleep(5000);
assertEquals(0, SECONDARY_GET_COUNT.get());
assertEquals(0, getSecondaryGetCount());
}
@Test
public void testReplicaRead() throws Exception {
// fail the primary get request
FAIL_PRIMARY_GET = true;
REPLICA_ID_TO_COUNT.clear();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
// make sure that we could still get the value from secondary replicas
AsyncTable<?> table = getTable.get();
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
// make sure that the primary request has been canceled
Thread.sleep(5000);
int count = PRIMARY_GET_COUNT.get();
int count = getPrimaryGetCount();
Thread.sleep(10000);
assertEquals(count, PRIMARY_GET_COUNT.get());
assertEquals(count, getPrimaryGetCount());
}
@Test
public void testReadSpecificReplica() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
AsyncTable<?> table = getTable.get();
for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
get.setReplicaId(replicaId);
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
}
}
}