HBASE-21682 Support getting from specific replica

This commit is contained in:
zhangduo 2019-01-07 20:34:01 +08:00 committed by Duo Zhang
parent 184cff0d4d
commit 3de116af46
2 changed files with 51 additions and 22 deletions

View File

@ -261,12 +261,17 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public CompletableFuture<Result> get(Get get) { public CompletableFuture<Result> get(Get get) {
if (get.getConsistency() == Consistency.STRONG) {
return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
}
// user specifies a replica id explicitly, just send request to the specific replica
if (get.getReplicaId() >= 0) {
return get(get, get.getReplicaId(), readRpcTimeoutNs);
}
// Timeline consistent read, where we may send requests to other region replicas
CompletableFuture<Result> primaryFuture = CompletableFuture<Result> primaryFuture =
get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs); get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
if (get.getConsistency() == Consistency.STRONG) {
return primaryFuture;
}
// Timeline consistent read, where we will send requests to other region replicas
CompletableFuture<Result> future = new CompletableFuture<>(); CompletableFuture<Result> future = new CompletableFuture<>();
connect(primaryFuture, future); connect(primaryFuture, future);
long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs(); long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -75,6 +77,8 @@ public class TestAsyncTableRegionReplicasGet {
private static byte[] VALUE = Bytes.toBytes("value"); private static byte[] VALUE = Bytes.toBytes("value");
private static int REPLICA_COUNT = 3;
private static AsyncConnection ASYNC_CONN; private static AsyncConnection ASYNC_CONN;
@Rule @Rule
@ -99,9 +103,8 @@ public class TestAsyncTableRegionReplicasGet {
private static volatile boolean FAIL_PRIMARY_GET = false; private static volatile boolean FAIL_PRIMARY_GET = false;
private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0); private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
new ConcurrentHashMap<>();
private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0);
public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@ -117,13 +120,10 @@ public class TestAsyncTableRegionReplicasGet {
if (!region.getTable().equals(TABLE_NAME)) { if (!region.getTable().equals(TABLE_NAME)) {
return; return;
} }
if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) { REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
SECONDARY_GET_COUNT.incrementAndGet(); .incrementAndGet();
} else { if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
PRIMARY_GET_COUNT.incrementAndGet(); throw new IOException("Inject error");
if (FAIL_PRIMARY_GET) {
throw new IOException("Inject error");
}
} }
} }
} }
@ -152,10 +152,9 @@ public class TestAsyncTableRegionReplicasGet {
// infinite retry // infinite retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin() TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3) .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
.setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME); AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
@ -172,10 +171,21 @@ public class TestAsyncTableRegionReplicasGet {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
private static int getSecondaryGetCount() {
return REPLICA_ID_TO_COUNT.entrySet().stream()
.filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
.mapToInt(e -> e.getValue().get()).sum();
}
private static int getPrimaryGetCount() {
AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
return primaryGetCount != null ? primaryGetCount.get() : 0;
}
@Test @Test
public void testNoReplicaRead() throws Exception { public void testNoReplicaRead() throws Exception {
FAIL_PRIMARY_GET = false; FAIL_PRIMARY_GET = false;
SECONDARY_GET_COUNT.set(0); REPLICA_ID_TO_COUNT.clear();
AsyncTable<?> table = getTable.get(); AsyncTable<?> table = getTable.get();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -184,21 +194,35 @@ public class TestAsyncTableRegionReplicasGet {
// the primary region is fine and the primary timeout is 1 second which is long enough, so we // the primary region is fine and the primary timeout is 1 second which is long enough, so we
// should not send any requests to secondary replicas even if the consistency is timeline. // should not send any requests to secondary replicas even if the consistency is timeline.
Thread.sleep(5000); Thread.sleep(5000);
assertEquals(0, SECONDARY_GET_COUNT.get()); assertEquals(0, getSecondaryGetCount());
} }
@Test @Test
public void testReplicaRead() throws Exception { public void testReplicaRead() throws Exception {
// fail the primary get request // fail the primary get request
FAIL_PRIMARY_GET = true; FAIL_PRIMARY_GET = true;
REPLICA_ID_TO_COUNT.clear();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
// make sure that we could still get the value from secondary replicas // make sure that we could still get the value from secondary replicas
AsyncTable<?> table = getTable.get(); AsyncTable<?> table = getTable.get();
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
// make sure that the primary request has been canceled // make sure that the primary request has been canceled
Thread.sleep(5000); Thread.sleep(5000);
int count = PRIMARY_GET_COUNT.get(); int count = getPrimaryGetCount();
Thread.sleep(10000); Thread.sleep(10000);
assertEquals(count, PRIMARY_GET_COUNT.get()); assertEquals(count, getPrimaryGetCount());
}
@Test
public void testReadSpecificReplica() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
AsyncTable<?> table = getTable.get();
for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
get.setReplicaId(replicaId);
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
}
} }
} }