HBASE-25272 Support scan on a specific replica (#2645)

Signed-off-by: stack <stack@apache.org>
Signed-off-by: Huaxiang Sun <huaxiangsun@apache.org>
This commit is contained in:
Duo Zhang 2020-11-12 21:25:53 +08:00 committed by GitHub
parent 873bef1d7e
commit b19df076a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 432 additions and 568 deletions

View File

@ -21,33 +21,32 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.LeaseException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.LeaseException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@ -142,6 +141,11 @@ public abstract class ClientScanner extends AbstractClientScanner {
initCache();
}
protected final int getScanReplicaId() {
return scan.getReplicaId() >= RegionReplicaUtil.DEFAULT_REPLICA_ID ? scan.getReplicaId() :
RegionReplicaUtil.DEFAULT_REPLICA_ID;
}
protected ClusterConnection getConnection() {
return this.connection;
}

View File

@ -61,6 +61,6 @@ public class ClientSimpleScanner extends ClientScanner {
scan.withStartRow(createClosestRowAfter(scan.getStartRow()), true);
}
return new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
this.rpcControllerFactory, getScanReplicaId());
}
}

View File

@ -37,13 +37,6 @@ public class ReversedClientScanner extends ClientScanner {
/**
* Create a new ReversibleClientScanner for the specified table Note that the passed
* {@link Scan}'s start row maybe changed.
* @param conf
* @param scan
* @param tableName
* @param connection
* @param pool
* @param primaryOperationTimeout
* @throws IOException
*/
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
@ -65,6 +58,6 @@ public class ReversedClientScanner extends ClientScanner {
@Override
protected ReversedScannerCallable createScannerCallable() {
return new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
this.rpcControllerFactory, getScanReplicaId());
}
}

View File

@ -45,23 +45,11 @@ import org.apache.hadoop.hbase.util.Bytes;
public class ReversedScannerCallable extends ScannerCallable {
/**
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
* regionserver
*/
public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) {
super(connection, tableName, scan, scanMetrics, rpcFactory);
}
/**
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param connection which connection
* @param tableName table callable is on
* @param scan the scan to execute
* @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
* metrics
* @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
* regionserver
* @param replicaId the replica id
@ -73,7 +61,6 @@ public class ReversedScannerCallable extends ScannerCallable {
/**
* @param reload force reload of server location
* @throws IOException
*/
@Override
public void prepare(boolean reload) throws IOException {
@ -86,9 +73,8 @@ public class ReversedScannerCallable extends ScannerCallable {
// 2. the start row is empty which means we need to locate to the last region.
if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
// Just locate the region with the row
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
getConnection(), getTableName(), getRow());
this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
RegionLocations rl = getRegionLocations(reload, getRow());
this.location = getLocationForReplica(rl);
if (location == null || location.getServerName() == null) {
throw new IOException("Failed to find location, tableName="
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
@ -126,7 +112,6 @@ public class ReversedScannerCallable extends ScannerCallable {
* @param reload force reload of server location
* @return A list of HRegionLocation corresponding to the regions that contain
* the specified range
* @throws IOException
*/
private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
byte[] endKey, boolean reload) throws IOException {
@ -140,15 +125,14 @@ public class ReversedScannerCallable extends ScannerCallable {
List<HRegionLocation> regionList = new ArrayList<>();
byte[] currentKey = startKey;
do {
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
getConnection(), getTableName(), currentKey);
HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
RegionLocations rl = getRegionLocations(reload, currentKey);
HRegionLocation regionLocation = getLocationForReplica(rl);
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
regionList.add(regionLocation);
} else {
throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
+ Bytes.toStringBinary(currentKey) + " returns incorrect region "
+ (regionLocation == null ? null : regionLocation.getRegionInfo()));
throw new DoNotRetryIOException(
"Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) +
" returns incorrect region " + regionLocation.getRegionInfo());
}
currentKey = regionLocation.getRegionInfo().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)

View File

@ -26,7 +26,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMet
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@ -37,13 +36,14 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -103,18 +103,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
* @param rpcControllerFactory factory to use when creating
* {@link com.google.protobuf.RpcController}
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
}
/**
*
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param id the replicaId
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority());
@ -127,23 +115,33 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
this.rpcControllerFactory = rpcControllerFactory;
}
protected final HRegionLocation getLocationForReplica(RegionLocations locs)
throws HBaseIOException {
HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null;
if (loc == null || loc.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
return loc;
}
protected final RegionLocations getRegionLocations(boolean reload, byte[] row)
throws IOException {
return RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id, getConnection(),
getTableName(), row);
}
/**
* @param reload force reload of server location
* @throws IOException
*/
@Override
public void prepare(boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
id, getConnection(), getTableName(), getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
RegionLocations rl = getRegionLocations(reload, getRow());
location = getLocationForReplica(rl);
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
if (!instantiated || reload) {

View File

@ -187,7 +187,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
AtomicBoolean done = new AtomicBoolean(false);
replicaSwitched.set(false);
// submit call for the primary replica.
// submit call for the primary replica or user specified replica
addCallsForCurrentReplica(cs);
int startIndex = 0;
@ -209,13 +209,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
LOG.debug("Scan with primary region returns " + e.getCause());
}
// If rl's size is 1 or scan's consitency is strong, it needs to throw
// out the exception from the primary replica
if ((regionReplication == 1) || (scan.getConsistency() == Consistency.STRONG)) {
// If rl's size is 1 or scan's consitency is strong, or scan is over specific replica,
// it needs to throw out the exception from the primary replica
if (regionReplication == 1 || scan.getConsistency() == Consistency.STRONG ||
scan.getReplicaId() >= 0) {
// Rethrow the first exception
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
}
startIndex = 1;
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
@ -225,8 +225,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// submit call for the all of the secondaries at once
int endIndex = regionReplication;
if (scan.getConsistency() == Consistency.STRONG) {
// When scan's consistency is strong, do not send to the secondaries
if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) {
// When scan's consistency is strong or scan is over specific replica region, do not send to
// the secondaries
endIndex = 1;
} else {
// TODO: this may be an overkill for large region replication

View File

@ -75,7 +75,7 @@ public class TestReversedScannerCallable {
Mockito.when(connection.relocateRegion(tableName, ROW, 0)).thenReturn(regionLocations);
ReversedScannerCallable callable =
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory);
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0);
callable.prepare(true);
Mockito.verify(connection).relocateRegion(tableName, ROW, 0);
@ -88,7 +88,7 @@ public class TestReversedScannerCallable {
.thenReturn(regionLocations);
ReversedScannerCallable callable =
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory);
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0);
callable.prepare(false);
Mockito.verify(connection).locateRegion(tableName, ROW, true, true, 0);

View File

@ -17,14 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.codahale.metrics.Counter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -36,13 +38,12 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@ -57,11 +58,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
* cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
*/
@Category({LargeTests.class, ClientTests.class})
@SuppressWarnings("deprecation")
public class TestReplicasClient {
@ClassRule
@ -85,12 +83,12 @@ public class TestReplicasClient {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
private static final int NB_SERVERS = 1;
private static Table table = null;
private static TableName TABLE_NAME;
private Table table = null;
private static final byte[] row = TestReplicasClient.class.getName().getBytes();
private static HRegionInfo hriPrimary;
private static HRegionInfo hriSecondary;
private static RegionInfo hriPrimary;
private static RegionInfo hriSecondary;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;
@ -101,6 +99,8 @@ public class TestReplicasClient {
* This copro is used to synchronize the tests.
*/
public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
static final AtomicInteger primaryCountOfScan = new AtomicInteger(0);
static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0);
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
static final AtomicInteger countOfNext = new AtomicInteger(0);
@ -117,21 +117,33 @@ public class TestReplicasClient {
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
final List<Cell> results) throws IOException {
slowdownCode(e);
}
private void incrementScanCount(ObserverContext<RegionCoprocessorEnvironment> e) {
LOG.info("==========scan {} ", e.getEnvironment().getRegion().getRegionInfo().getReplicaId(),
new Exception());
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
primaryCountOfScan.incrementAndGet();
} else {
secondaryCountOfScan.incrementAndGet();
}
}
@Override
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan) throws IOException {
incrementScanCount(e);
slowdownCode(e);
}
@Override
public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
final InternalScanner s, final List<Result> results,
final int limit, final boolean hasMore) throws IOException {
final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
throws IOException {
incrementScanCount(e);
// this will slow down a certain next operation if the conditions are met. The slowness
// will allow the call to go to a replica
if (slowDownNext.get()) {
@ -201,17 +213,17 @@ public class TestReplicasClient {
HTU.startMiniCluster(option);
// Create table then get the single region for our new table.
HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
TABLE_NAME = TableName.valueOf(TestReplicasClient.class.getSimpleName());
HTableDescriptor hdt = HTU.createTableDescriptor(TABLE_NAME);
hdt.addCoprocessor(SlowMeCopro.class.getName());
table = HTU.createTable(hdt, new byte[][]{f}, null);
HTU.createTable(hdt, new byte[][]{f}, null);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
hriPrimary = locator.getRegionLocation(row, false).getRegionInfo();
try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME)) {
hriPrimary = locator.getRegionLocation(row, false).getRegion();
}
// mock a secondary region info to open
hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
// No master
LOG.info("Master is going to be stopped");
@ -224,13 +236,11 @@ public class TestReplicasClient {
@AfterClass
public static void afterClass() throws Exception {
HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
if (table != null) table.close();
HTU.shutdownMiniCluster();
}
@Before
public void before() throws IOException {
((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache();
try {
openRegion(hriPrimary);
} catch (Exception ignored) {
@ -239,10 +249,31 @@ public class TestReplicasClient {
openRegion(hriSecondary);
} catch (Exception ignored) {
}
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0));
table = HTU.getConnection().getTable(TABLE_NAME);
try (ResultScanner scanner = table.getScanner(new Scan())) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
table.delete(new Delete(result.getRow()));
}
}
flushRegion(hriPrimary);
HTU.getConnection().clearRegionLocationCache();
SlowMeCopro.primaryCountOfScan.set(0);
SlowMeCopro.secondaryCountOfScan.set(0);
SlowMeCopro.countOfNext.set(0);
}
@After
public void after() throws IOException, KeeperException {
SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.getSecondaryCdl().get().countDown();
try {
closeRegion(hriSecondary);
} catch (Exception ignored) {
@ -251,45 +282,50 @@ public class TestReplicasClient {
closeRegion(hriPrimary);
} catch (Exception ignored) {
}
((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache();
if (table != null) {
table.close();
}
HTU.getConnection().clearRegionLocationCache();
}
private HRegionServer getRS() {
return HTU.getMiniHBaseCluster().getRegionServer(0);
}
private void openRegion(HRegionInfo hri) throws Exception {
private void openRegion(RegionInfo hri) throws Exception {
try {
if (isRegionOpened(hri)) return;
} catch (Exception e){}
if (isRegionOpened(hri)) {
return;
}
} catch (Exception e) {
}
// first version is '0'
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
getRS().getServerName(), hri, null);
AdminProtos.OpenRegionRequest orr =
RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
Assert.assertEquals(1, responseOpen.getOpeningStateCount());
Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
assertEquals(1, responseOpen.getOpeningStateCount());
assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
responseOpen.getOpeningState(0));
checkRegionIsOpened(hri);
}
private void closeRegion(HRegionInfo hri) throws Exception {
private void closeRegion(RegionInfo hri) throws Exception {
AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest(
getRS().getServerName(), hri.getRegionName());
AdminProtos.CloseRegionResponse responseClose = getRS()
.getRSRpcServices().closeRegion(null, crr);
Assert.assertTrue(responseClose.getClosed());
assertTrue(responseClose.getClosed());
checkRegionIsClosed(hri.getEncodedName());
}
private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
private void checkRegionIsOpened(RegionInfo hri) throws Exception {
while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
Thread.sleep(1);
}
}
private boolean isRegionOpened(HRegionInfo hri) throws Exception {
private boolean isRegionOpened(RegionInfo hri) throws Exception {
return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
}
@ -300,7 +336,7 @@ public class TestReplicasClient {
}
try {
Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
} catch (NotServingRegionException expected) {
// That's how it work: if the region is closed we have an exception.
}
@ -308,64 +344,45 @@ public class TestReplicasClient {
// We don't delete the znode here, because there is not always a znode.
}
private void flushRegion(HRegionInfo regionInfo) throws IOException {
private void flushRegion(RegionInfo regionInfo) throws IOException {
TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
}
@Test
public void testUseRegionWithoutReplica() throws Exception {
byte[] b1 = "testUseRegionWithoutReplica".getBytes();
openRegion(hriSecondary);
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
try {
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
} finally {
closeRegion(hriSecondary);
}
assertFalse(r.isStale());
}
@Test
public void testLocations() throws Exception {
byte[] b1 = "testLocations".getBytes();
openRegion(hriSecondary);
ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection();
try {
hc.clearRegionLocationCache();
RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
Assert.assertEquals(2, rl.size());
assertEquals(2, rl.size());
rl = hc.locateRegion(table.getName(), b1, true, false);
Assert.assertEquals(2, rl.size());
assertEquals(2, rl.size());
hc.clearRegionLocationCache();
rl = hc.locateRegion(table.getName(), b1, true, false);
Assert.assertEquals(2, rl.size());
assertEquals(2, rl.size());
rl = hc.locateRegion(table.getName(), b1, false, false);
Assert.assertEquals(2, rl.size());
} finally {
closeRegion(hriSecondary);
}
assertEquals(2, rl.size());
}
@Test
public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
openRegion(hriSecondary);
try {
// A get works and is not stale
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
} finally {
closeRegion(hriSecondary);
assertFalse(r.isStale());
}
}
@Test
public void testGetNoResultStaleRegionWithReplica() throws Exception {
@ -373,39 +390,24 @@ public class TestReplicasClient {
openRegion(hriSecondary);
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
try {
Get g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
SlowMeCopro.getPrimaryCdl().get().countDown();
closeRegion(hriSecondary);
}
assertTrue(r.isStale());
}
@Test
public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
openRegion(hriSecondary);
try {
// We sleep; but we won't go to the stale region as we don't get the stale by default.
SlowMeCopro.sleepTime.set(2000);
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
} finally {
SlowMeCopro.sleepTime.set(0);
closeRegion(hriSecondary);
}
assertFalse(r.isStale());
}
@Test
public void testFlushTable() throws Exception {
openRegion(hriSecondary);
try {
flushRegion(hriPrimary);
flushRegion(hriSecondary);
@ -415,18 +417,10 @@ public class TestReplicasClient {
flushRegion(hriPrimary);
flushRegion(hriSecondary);
} finally {
Delete d = new Delete(row);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Test
public void testFlushPrimary() throws Exception {
openRegion(hriSecondary);
try {
flushRegion(hriPrimary);
Put p = new Put(row);
@ -434,17 +428,10 @@ public class TestReplicasClient {
table.put(p);
flushRegion(hriPrimary);
} finally {
Delete d = new Delete(row);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Test
public void testFlushSecondary() throws Exception {
openRegion(hriSecondary);
try {
flushRegion(hriSecondary);
Put p = new Put(row);
@ -452,20 +439,11 @@ public class TestReplicasClient {
table.put(p);
flushRegion(hriSecondary);
} catch (TableNotFoundException expected) {
} finally {
Delete d = new Delete(row);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Test
public void testUseRegionWithReplica() throws Exception {
byte[] b1 = "testUseRegionWithReplica".getBytes();
openRegion(hriSecondary);
try {
// A simple put works, even if there here a second replica
Put p = new Put(b1);
p.addColumn(f, b1, b1);
@ -475,16 +453,16 @@ public class TestReplicasClient {
// A get works and is not stale
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
assertFalse(r.isStale());
assertFalse(r.getColumnCells(f, b1).isEmpty());
LOG.info("get works and is not stale done");
// Even if it we have to wait a little on the main region
SlowMeCopro.sleepTime.set(2000);
g = new Get(b1);
r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
assertFalse(r.isStale());
assertFalse(r.getColumnCells(f, b1).isEmpty());
SlowMeCopro.sleepTime.set(0);
LOG.info("sleep and is not stale done");
@ -493,8 +471,8 @@ public class TestReplicasClient {
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
assertTrue(r.isStale());
assertTrue(r.getColumnCells(f, b1).isEmpty());
SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("stale done");
@ -503,8 +481,8 @@ public class TestReplicasClient {
g = new Get(b1);
g.setCheckExistenceOnly(true);
r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertTrue(r.getExists());
assertFalse(r.isStale());
assertTrue(r.getExists());
LOG.info("exists not stale done");
// exists works on stale but don't see the put
@ -513,8 +491,8 @@ public class TestReplicasClient {
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse("The secondary has stale data", r.getExists());
assertTrue(r.isStale());
assertFalse("The secondary has stale data", r.getExists());
SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("exists stale before flush done");
@ -528,8 +506,8 @@ public class TestReplicasClient {
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse(r.isEmpty());
assertTrue(r.isStale());
assertFalse(r.isEmpty());
SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("stale done");
@ -539,26 +517,15 @@ public class TestReplicasClient {
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getExists());
assertTrue(r.isStale());
assertTrue(r.getExists());
SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("exists stale after flush done");
} finally {
SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
Delete d = new Delete(b1);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Test
public void testHedgedRead() throws Exception {
byte[] b1 = "testHedgedRead".getBytes();
openRegion(hriSecondary);
try {
// A simple put works, even if there here a second replica
Put p = new Put(b1);
p.addColumn(f, b1, b1);
@ -568,8 +535,8 @@ public class TestReplicasClient {
// A get works and is not stale
Get g = new Get(b1);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
assertFalse(r.isStale());
assertFalse(r.getColumnCells(f, b1).isEmpty());
LOG.info("get works and is not stale done");
// reset
@ -581,117 +548,32 @@ public class TestReplicasClient {
// Wait a little on the main region, just enough to happen once hedged read
// and hedged read did not returned faster
int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
int primaryCallTimeoutMicroSecond =
connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertFalse(r.isStale());
Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
Assert.assertEquals(1, hedgedReadOps.getCount());
Assert.assertEquals(0, hedgedReadWin.getCount());
assertFalse(r.isStale());
assertFalse(r.getColumnCells(f, b1).isEmpty());
assertEquals(1, hedgedReadOps.getCount());
assertEquals(0, hedgedReadWin.getCount());
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.getSecondaryCdl().get().countDown();
LOG.info("hedged read occurred but not faster");
// But if we ask for stale we will get it and hedged read returned faster
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
Assert.assertEquals(2, hedgedReadOps.getCount());
Assert.assertEquals(1, hedgedReadWin.getCount());
assertTrue(r.isStale());
assertTrue(r.getColumnCells(f, b1).isEmpty());
assertEquals(2, hedgedReadOps.getCount());
assertEquals(1, hedgedReadWin.getCount());
SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("hedged read occurred and faster");
} finally {
SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.getSecondaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
Delete d = new Delete(b1);
table.delete(d);
closeRegion(hriSecondary);
}
}
@Ignore // Disabled because it is flakey. Fails 17% on constrained GCE. %3 on Apache.
@Test
public void testCancelOfMultiGet() throws Exception {
openRegion(hriSecondary);
try {
List<Put> puts = new ArrayList<>(2);
byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
Put p = new Put(b1);
p.addColumn(f, b1, b1);
puts.add(p);
byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
p = new Put(b2);
p.addColumn(f, b2, b2);
puts.add(p);
table.put(puts);
LOG.debug("PUT done");
flushRegion(hriPrimary);
LOG.info("flush done");
Thread.sleep(1000 + REFRESH_PERIOD * 2);
AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess();
// Make primary slowdown
SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
List<Get> gets = new ArrayList<>();
Get g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
gets.add(g);
g = new Get(b2);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
gets.add(g);
Object[] results = new Object[2];
int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout();
int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout();
AsyncProcessTask task = AsyncProcessTask.newBuilder()
.setPool(HTable.getDefaultExecutor(HTU.getConfiguration()))
.setTableName(table.getName())
.setRowAccess(gets)
.setResults(results)
.setOperationTimeout(operationTimeout)
.setRpcTimeout(readTimeout)
.build();
AsyncRequestFuture reqs = ap.submit(task);
reqs.waitUntilDone();
// verify we got the right results back
for (Object r : results) {
Assert.assertTrue(((Result)r).isStale());
Assert.assertTrue(((Result)r).getExists());
}
Set<CancellableRegionServerCallable> set =
((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
// verify we did cancel unneeded calls
Assert.assertTrue(!set.isEmpty());
for (CancellableRegionServerCallable m : set) {
Assert.assertTrue(m.isCancelled());
}
} finally {
SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < 2; i++) {
byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
}
}
@Test
@ -714,10 +596,8 @@ public class TestReplicasClient {
@Test
public void testCancelOfScan() throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;
try {
for (int i = 0; i < NUMROWS; i++) {
int numRows = 100;
for (int i = 0; i < numRows; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Put p = new Put(b1);
p.addColumn(f, b1, b1);
@ -737,37 +617,45 @@ public class TestReplicasClient {
SlowMeCopro.countOfNext.set(0);
SlowMeCopro.sleepTime.set(5000);
Scan scan = new Scan(start);
Scan scan = new Scan().withStartRow(start);
scan.setCaching(caching);
scan.setConsistency(Consistency.TIMELINE);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iter = scanner.iterator();
iter.next();
Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
assertTrue(((ClientScanner) scanner).isAnyRPCcancelled());
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
// make sure the scan will only go to the specific replica
@Test
public void testScanOnSpecificReplica() throws Exception {
Scan scan = new Scan().setReplicaId(1).setConsistency(Consistency.TIMELINE);
try (ResultScanner scanner = table.getScanner(scan)) {
scanner.next();
}
assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0);
assertEquals(0, SlowMeCopro.primaryCountOfScan.get());
}
// make sure the scan will only go to the specific replica
@Test
public void testReverseScanOnSpecificReplica() throws Exception {
Scan scan = new Scan().setReversed(true).setReplicaId(1).setConsistency(Consistency.TIMELINE);
try (ResultScanner scanner = table.getScanner(scan)) {
scanner.next();
}
assertTrue(SlowMeCopro.secondaryCountOfScan.get() > 0);
assertEquals(0, SlowMeCopro.primaryCountOfScan.get());
}
private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;
int NUMCOLS = 10;
try {
for (int i = 0; i < NUMROWS; i++) {
int numRows = 100;
int numCols = 10;
for (int i = 0; i < numRows; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
for (int col = 0; col < NUMCOLS; col++) {
for (int col = 0; col < numCols; col++) {
Put p = new Put(b1);
String qualifier = "qualifer" + col;
KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
@ -780,17 +668,20 @@ public class TestReplicasClient {
long maxResultSize = Long.MAX_VALUE;
byte[] start;
if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
if (reversed) {
start = Bytes.toBytes("testUseRegionWithReplica" + (numRows - 1));
} else {
start = Bytes.toBytes("testUseRegionWithReplica" + 0);
}
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
start, NUMROWS, NUMCOLS, false, false);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
numCols, false, false);
// Even if we were to slow the server down, unless we ask for stale
// we won't get it
SlowMeCopro.sleepTime.set(5000);
scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
NUMCOLS, false, false);
scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows,
numCols, false, false);
SlowMeCopro.sleepTime.set(0);
flushRegion(hriPrimary);
@ -799,22 +690,22 @@ public class TestReplicasClient {
// Now set the flag to get a response even if stale
SlowMeCopro.sleepTime.set(5000);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
start, NUMROWS, NUMCOLS, true, false);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
numCols, true, false);
SlowMeCopro.sleepTime.set(0);
// now make some 'next' calls slow
SlowMeCopro.slowDownNext.set(true);
SlowMeCopro.countOfNext.set(0);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
NUMROWS, NUMCOLS, true, true);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
numCols, true, true);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
// Make sure we do not get stale data..
SlowMeCopro.sleepTime.set(5000);
scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
start, NUMROWS, NUMCOLS, false, false);
scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows,
numCols, false, false);
SlowMeCopro.sleepTime.set(0);
// While the next calls are slow, set maxResultSize to 1 so that some partial results will be
@ -822,30 +713,18 @@ public class TestReplicasClient {
maxResultSize = 1;
SlowMeCopro.slowDownNext.set(true);
SlowMeCopro.countOfNext.set(0);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
NUMROWS, NUMCOLS, true, true);
scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows,
numCols, true, true);
maxResultSize = Long.MAX_VALUE;
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
for (int i = 0; i < NUMROWS; i++) {
byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
Delete d = new Delete(b1);
table.delete(d);
}
closeRegion(hriSecondary);
}
}
private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
boolean staleExpected, boolean slowNext)
throws Exception {
Scan scan = new Scan(startRow);
Scan scan = new Scan().withStartRow(startRow);
scan.setCaching(caching);
scan.setMaxResultSize(maxResultSize);
scan.setReversed(reversed);
@ -872,29 +751,30 @@ public class TestReplicasClient {
}
map.put(row, true);
cellCount += r.rawCells().length;
for (Cell cell : r.rawCells()) {
cellCount++;
if (!slowNext) {
assertTrue(r.isStale() == staleExpected);
}
if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
if (r.isStale()) countOfStale++;
if (r.isStale()) {
countOfStale++;
}
Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
}
assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
rowCount == numRows);
Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
cellCount == (numRows * numCols));
if (slowNext) {
LOG.debug("Count of Stale " + countOfStale);
Assert.assertTrue(countOfStale > 1);
assertTrue(countOfStale > 1);
// If the scan was configured in such a way that a full row was NOT retrieved before the
// replica switch occurred, then it is possible that all rows were stale
if (maxResultSize != Long.MAX_VALUE) {
Assert.assertTrue(countOfStale <= numRows);
assertTrue(countOfStale <= numRows);
} else {
Assert.assertTrue(countOfStale < numRows);
assertTrue(countOfStale < numRows);
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -129,8 +130,11 @@ public class TestRegionServerNoMaster {
}
}
/** Flush the given region in the mini cluster. Since no master, we cannot use HBaseAdmin.flush() */
public static void flushRegion(HBaseTestingUtility HTU, HRegionInfo regionInfo) throws IOException {
/**
* Flush the given region in the mini cluster. Since no master, we cannot use HBaseAdmin.flush()
*/
public static void flushRegion(HBaseTestingUtility HTU, RegionInfo regionInfo)
throws IOException {
for (RegionServerThread rst : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
HRegion region = rst.getRegionServer().getRegionByEncodedName(regionInfo.getEncodedName());
if (region != null) {