diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 28db7e86e95..2ab9f6ac940 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -261,12 +261,17 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture get(Get get) { + if (get.getConsistency() == Consistency.STRONG) { + return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs); + } + // user specifies a replica id explicitly, just send request to the specific replica + if (get.getReplicaId() >= 0) { + return get(get, get.getReplicaId(), readRpcTimeoutNs); + } + + // Timeline consistent read, where we may send requests to other region replicas CompletableFuture primaryFuture = get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs); - if (get.getConsistency() == Consistency.STRONG) { - return primaryFuture; - } - // Timeline consistent read, where we will send requests to other region replicas CompletableFuture future = new CompletableFuture<>(); connect(primaryFuture, future); long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java index 0445a0e8849..2117116d745 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -75,6 +77,8 @@ public class TestAsyncTableRegionReplicasGet { private static byte[] VALUE = Bytes.toBytes("value"); + private static int REPLICA_COUNT = 3; + private static AsyncConnection ASYNC_CONN; @Rule @@ -99,9 +103,8 @@ public class TestAsyncTableRegionReplicasGet { private static volatile boolean FAIL_PRIMARY_GET = false; - private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0); - - private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0); + private static ConcurrentMap REPLICA_ID_TO_COUNT = + new ConcurrentHashMap<>(); public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { @@ -117,13 +120,10 @@ public class TestAsyncTableRegionReplicasGet { if (!region.getTable().equals(TABLE_NAME)) { return; } - if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) { - SECONDARY_GET_COUNT.incrementAndGet(); - } else { - PRIMARY_GET_COUNT.incrementAndGet(); - if (FAIL_PRIMARY_GET) { - throw new IOException("Inject error"); - } + REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()) + .incrementAndGet(); + if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) { + throw new IOException("Inject error"); } } } @@ -152,10 +152,9 @@ public class TestAsyncTableRegionReplicasGet { // infinite retry TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE); TEST_UTIL.startMiniCluster(3); - TEST_UTIL.getAdmin() - .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3) - .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); + TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT) + .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); @@ -172,10 +171,21 @@ public class TestAsyncTableRegionReplicasGet { TEST_UTIL.shutdownMiniCluster(); } + private static int getSecondaryGetCount() { + return REPLICA_ID_TO_COUNT.entrySet().stream() + .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID) + .mapToInt(e -> e.getValue().get()).sum(); + } + + private static int getPrimaryGetCount() { + AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID); + return primaryGetCount != null ? primaryGetCount.get() : 0; + } + @Test public void testNoReplicaRead() throws Exception { FAIL_PRIMARY_GET = false; - SECONDARY_GET_COUNT.set(0); + REPLICA_ID_TO_COUNT.clear(); AsyncTable table = getTable.get(); Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); for (int i = 0; i < 1000; i++) { @@ -184,21 +194,35 @@ public class TestAsyncTableRegionReplicasGet { // the primary region is fine and the primary timeout is 1 second which is long enough, so we // should not send any requests to secondary replicas even if the consistency is timeline. Thread.sleep(5000); - assertEquals(0, SECONDARY_GET_COUNT.get()); + assertEquals(0, getSecondaryGetCount()); } @Test public void testReplicaRead() throws Exception { // fail the primary get request FAIL_PRIMARY_GET = true; + REPLICA_ID_TO_COUNT.clear(); Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); // make sure that we could still get the value from secondary replicas AsyncTable table = getTable.get(); assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); // make sure that the primary request has been canceled Thread.sleep(5000); - int count = PRIMARY_GET_COUNT.get(); + int count = getPrimaryGetCount(); Thread.sleep(10000); - assertEquals(count, PRIMARY_GET_COUNT.get()); + assertEquals(count, getPrimaryGetCount()); + } + + @Test + public void testReadSpecificReplica() throws Exception { + FAIL_PRIMARY_GET = false; + REPLICA_ID_TO_COUNT.clear(); + Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); + AsyncTable table = getTable.get(); + for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) { + get.setReplicaId(replicaId); + assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); + assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get()); + } } }