HBASE-21663 Add replica scan support

This commit is contained in:
Duo Zhang 2019-01-11 10:49:33 +08:00
parent 8b9bfa0df5
commit a09dffd106
17 changed files with 549 additions and 371 deletions

View File

@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
/**
@ -40,7 +41,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
private final Callable<T> callable;
private ServerName serverName;
public AsyncAdminRequestRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
@ -69,10 +70,4 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
future.complete(result);
});
}
@Override
CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -56,7 +56,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -80,7 +80,7 @@ class AsyncBatchRpcRetryingCaller<T> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
private final HashedWheelTimer retryTimer;
private final Timer retryTimer;
private final AsyncConnectionImpl conn;
@ -130,7 +130,7 @@ class AsyncBatchRpcRetryingCaller<T> {
}
}
public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;

View File

@ -19,17 +19,27 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
@ -59,6 +69,8 @@ class AsyncClientScanner {
private final AsyncConnectionImpl conn;
private final Timer retryTimer;
private final long pauseNs;
private final int maxAttempts;
@ -72,7 +84,7 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
@ -84,6 +96,7 @@ class AsyncClientScanner {
this.consumer = consumer;
this.tableName = tableName;
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
@ -120,20 +133,19 @@ class AsyncClientScanner {
}
}
private int openScannerTries;
private final AtomicInteger openScannerTries = new AtomicInteger();
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries > 1) {
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
openScannerTries++;
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
scan, scan.getCaching(), false);
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
@ -148,40 +160,53 @@ class AsyncClientScanner {
}
private void startScan(OpenScannerResponse resp) {
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp).whenComplete((hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
}
});
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
.location(resp.loc).remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp), (hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
}
});
}
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call();
}
private long getPrimaryTimeoutNs() {
return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs()
: conn.connConf.getPrimaryScanTimeoutNs();
}
private void openScanner() {
incRegionCountMetrics(scanMetrics);
openScannerTries = 1;
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call().whenComplete((resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
startScan(resp);
});
openScannerTries.set(1);
addListener(
timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer),
(resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
startScan(resp);
});
}
public void start() {

View File

@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TI
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
@ -41,6 +43,8 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRO
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@ -100,6 +104,10 @@ class AsyncConnectionConfiguration {
// timeout, we will send request to secondaries.
private final long primaryCallTimeoutNs;
private final long primaryScanTimeoutNs;
private final long primaryMetaScanTimeoutNs;
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@ -132,6 +140,11 @@ class AsyncConnectionConfiguration {
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
this.primaryMetaScanTimeoutNs =
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@ -193,4 +206,12 @@ class AsyncConnectionConfiguration {
long getPrimaryCallTimeoutNs() {
return primaryCallTimeoutNs;
}
long getPrimaryScanTimeoutNs() {
return primaryScanTimeoutNs;
}
long getPrimaryMetaScanTimeoutNs() {
return primaryMetaScanTimeoutNs;
}
}

View File

@ -275,7 +275,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncTable<AdvancedScanResultConsumer> build() {
return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
}
};
}
@ -287,7 +287,8 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncTable<ScanResultConsumer> build() {
RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
RawAsyncTableImpl rawTable =
new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
}
};

View File

@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.util.concurrent.CompletableFuture;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
@ -39,7 +39,7 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable;
public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
@ -66,10 +66,4 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
});
});
}
@Override
public CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.List;
@ -31,20 +30,21 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
@InterfaceAudience.Private
public abstract class AsyncRpcRetryingCaller<T> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class);
private final HashedWheelTimer retryTimer;
private final Timer retryTimer;
private final long startNs;
@ -68,9 +68,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;
public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.pauseNs = pauseNs;

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@ -45,9 +45,9 @@ class AsyncRpcRetryingCallerFactory {
private final AsyncConnectionImpl conn;
private final HashedWheelTimer retryTimer;
private final Timer retryTimer;
public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
this.conn = conn;
this.retryTimer = retryTimer;
}

View File

@ -34,7 +34,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
@ -49,9 +48,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -72,7 +73,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
private final HashedWheelTimer retryTimer;
private final Timer retryTimer;
private final Scan scan;
@ -297,7 +298,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
}
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,

View File

@ -17,14 +17,14 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
/**
@ -42,7 +42,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable;
private ServerName serverName;
public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
@ -71,10 +71,4 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
future.complete(result);
});
}
@Override
CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@ -53,7 +53,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
private final Callable<T> callable;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
@ -114,10 +114,4 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
call(loc);
});
}
@Override
public CompletableFuture<T> call() {
doCall();
return future;
}
}

View File

@ -41,6 +41,9 @@ public class ConnectionConfiguration {
public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
"hbase.client.primaryCallTimeout.get";
public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
public static final String PRIMARY_SCAN_TIMEOUT_MICROSECOND =
"hbase.client.replicaCallTimeout.scan";
public static final int PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT = 1000000; // 1s
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
@ -92,11 +95,11 @@ public class ConnectionConfiguration {
conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
this.replicaCallTimeoutMicroSecondScan =
conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
conf.getInt(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT);
this.metaReplicaCallTimeoutMicroSecondScan =
conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
@ -31,11 +32,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@ -53,6 +56,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@ -123,9 +127,8 @@ public final class ConnectionUtils {
}
/**
* A ClusterConnection that will short-circuit RPC making direct invocations against the
* localhost if the invocation target is 'this' server; save on network and protobuf
* invocations.
* A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
* if the invocation target is 'this' server; save on network and protobuf invocations.
*/
// TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
@VisibleForTesting // Class is visible so can assert we are short-circuiting when expected.
@ -136,8 +139,7 @@ public final class ConnectionUtils {
private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user,
ServerName serverName, AdminService.BlockingInterface admin,
ClientService.BlockingInterface client)
throws IOException {
ClientService.BlockingInterface client) throws IOException {
super(conf, pool, user);
this.serverName = serverName;
this.localHostAdmin = admin;
@ -157,7 +159,8 @@ public final class ConnectionUtils {
@Override
public MasterKeepAliveConnection getMaster() throws IOException {
if (this.localHostClient instanceof MasterService.BlockingInterface) {
return new ShortCircuitMasterConnection((MasterService.BlockingInterface)this.localHostClient);
return new ShortCircuitMasterConnection(
(MasterService.BlockingInterface) this.localHostClient);
}
return super.getMaster();
}
@ -335,8 +338,8 @@ public final class ConnectionUtils {
return result;
}
Cell[] rawCells = result.rawCells();
int index =
Arrays.binarySearch(rawCells, keepCellsAfter, CellComparator.getInstance()::compareWithoutRow);
int index = Arrays.binarySearch(rawCells, keepCellsAfter,
CellComparator.getInstance()::compareWithoutRow);
if (index < 0) {
index = -index - 1;
} else {
@ -406,7 +409,7 @@ public final class ConnectionUtils {
static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
}
public static ScanResultCache createScanResultCache(Scan scan) {
@ -489,4 +492,84 @@ public final class ConnectionUtils {
}
scanMetrics.countOfRegions.incrementAndGet();
}
/**
* Connect the two futures, if the src future is done, then mark the dst future as done. And if
* the dst future is done, then cancel the src future. This is used for timeline consistent read.
*/
private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
addListener(srcFuture, (r, e) -> {
if (e != null) {
dstFuture.completeExceptionally(e);
} else {
dstFuture.complete(r);
}
});
// The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
// Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
// -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
// CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
// tie.
addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
}
private static <T> void sendRequestsToSecondaryReplicas(
Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs,
CompletableFuture<T> future) {
if (future.isDone()) {
// do not send requests to secondary replicas if the future is done, i.e, the primary request
// has already been finished.
return;
}
for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
connect(secondaryFuture, future);
}
}
static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator,
TableName tableName, Query query, byte[] row, RegionLocateType locateType,
Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs,
long primaryCallTimeoutNs, Timer retryTimer) {
if (query.getConsistency() == Consistency.STRONG) {
return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
// user specifies a replica id explicitly, just send request to the specific replica
if (query.getReplicaId() >= 0) {
return requestReplica.apply(query.getReplicaId());
}
// Timeline consistent read, where we may send requests to other region replicas
CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
CompletableFuture<T> future = new CompletableFuture<>();
connect(primaryFuture, future);
long startNs = System.nanoTime();
// after the getRegionLocations, all the locations for the replicas of this region should have
// been cached, so it is not big deal to locate them again when actually sending requests to
// these replicas.
addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs),
(locs, error) -> {
if (error != null) {
LOG.warn(
"Failed to locate all the replicas for table={}, row='{}', locateType={}" +
" give up timeline consistent read",
tableName, Bytes.toStringBinary(row), locateType, error);
return;
}
if (locs.size() <= 1) {
LOG.warn(
"There are no secondary replicas for region {}, give up timeline consistent read",
locs.getDefaultRegionLocation().getRegion());
return;
}
long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
if (delayNs <= 0) {
sendRequestsToSecondaryReplicas(requestReplica, locs, future);
} else {
retryTimer.newTimeout(
timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs,
TimeUnit.NANOSECONDS);
}
});
return future;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
@ -36,7 +37,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@ -45,11 +45,10 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@ -77,10 +76,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
private final AsyncConnectionImpl conn;
private final Timer retryTimer;
private final TableName tableName;
private final int defaultScannerCaching;
@ -103,8 +102,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private final int startLogErrorsCnt;
RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase<?> builder) {
RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
this.conn = conn;
this.retryTimer = retryTimer;
this.tableName = builder.tableName;
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
@ -219,8 +219,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return newCaller(row.getRow(), rpcTimeoutNs);
}
private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
return this.<Result> newCaller(get, timeoutNs)
private CompletableFuture<Result> get(Get get, int replicaId) {
return this.<Result> newCaller(get, readRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
@ -228,78 +228,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.replicaId(replicaId).call();
}
// Connect the two futures, if the src future is done, then mark the dst future as done. And if
// the dst future is done, then cancel the src future. This is used for timeline consistent read.
private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
addListener(srcFuture, (r, e) -> {
if (e != null) {
dstFuture.completeExceptionally(e);
} else {
dstFuture.complete(r);
}
});
// The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
// Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
// -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
// CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
// tie.
addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
}
private void timelineConsistentGet(Get get, RegionLocations locs,
CompletableFuture<Result> future) {
if (future.isDone()) {
// do not send requests to secondary replicas if the future is done, i.e, the primary request
// has already been finished.
return;
}
for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
connect(secondaryFuture, future);
}
}
@Override
public CompletableFuture<Result> get(Get get) {
if (get.getConsistency() == Consistency.STRONG) {
return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
}
// user specifies a replica id explicitly, just send request to the specific replica
if (get.getReplicaId() >= 0) {
return get(get, get.getReplicaId(), readRpcTimeoutNs);
}
// Timeline consistent read, where we may send requests to other region replicas
CompletableFuture<Result> primaryFuture =
get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
CompletableFuture<Result> future = new CompletableFuture<>();
connect(primaryFuture, future);
long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
long startNs = System.nanoTime();
addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
if (error != null) {
LOG.warn(
"Failed to locate all the replicas for table={}, row='{}'," +
" give up timeline consistent read",
tableName, Bytes.toStringBinary(get.getRow()), error);
return;
}
if (locs.size() <= 1) {
LOG.warn(
"There are no secondary replicas for region {}," + " give up timeline consistent read",
locs.getDefaultRegionLocation().getRegion());
return;
}
long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
if (delayNs <= 0) {
timelineConsistentGet(get, locs, future);
} else {
AsyncConnectionImpl.RETRY_TIMER.newTimeout(
timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
}
});
return future;
return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer);
}
@Override
@ -494,8 +427,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
}
private long resultSize2CacheSize(long maxResultSize) {

View File

@ -0,0 +1,232 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
public abstract class AbstractTestAsyncTableRegionReplicasRead {
protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static TableName TABLE_NAME = TableName.valueOf("async");
protected static byte[] FAMILY = Bytes.toBytes("cf");
protected static byte[] QUALIFIER = Bytes.toBytes("cq");
protected static byte[] ROW = Bytes.toBytes("row");
protected static byte[] VALUE = Bytes.toBytes("value");
protected static int REPLICA_COUNT = 3;
protected static AsyncConnection ASYNC_CONN;
@Rule
public TestName testName = new TestName();
@Parameter
public Supplier<AsyncTable<?>> getTable;
private static AsyncTable<?> getRawTable() {
return ASYNC_CONN.getTable(TABLE_NAME);
}
private static AsyncTable<?> getTable() {
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
}
@Parameters
public static List<Object[]> params() {
return Arrays.asList(
new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable },
new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable });
}
protected static volatile boolean FAIL_PRIMARY_GET = false;
protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
new ConcurrentHashMap<>();
public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c)
throws IOException {
RegionInfo region = c.getEnvironment().getRegionInfo();
if (!region.getTable().equals(TABLE_NAME)) {
return;
}
REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
.incrementAndGet();
if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
throw new IOException("Inject error");
}
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
recordAndTryFail(c);
}
@Override
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
throws IOException {
recordAndTryFail(c);
}
}
private static boolean allReplicasHaveRow(byte[] row) throws IOException {
for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
if (region.get(new Get(row), false).isEmpty()) {
return false;
}
}
}
return true;
}
protected static void startClusterAndCreateTable() throws Exception {
// 10 mins
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
TimeUnit.MINUTES.toMillis(10));
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
TimeUnit.MINUTES.toMillis(10));
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
TimeUnit.MINUTES.toMillis(10));
// 1 second
TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
TimeUnit.SECONDS.toMicros(1));
TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND,
TimeUnit.SECONDS.toMicros(1));
// set a small pause so we will retry very quickly
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
// infinite retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
.setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
// this is the fastest way to let all replicas have the row
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
protected static int getSecondaryGetCount() {
return REPLICA_ID_TO_COUNT.entrySet().stream()
.filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
.mapToInt(e -> e.getValue().get()).sum();
}
protected static int getPrimaryGetCount() {
AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
return primaryGetCount != null ? primaryGetCount.get() : 0;
}
// replicaId = -1 means do not set replica
protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
@Test
public void testNoReplicaRead() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
AsyncTable<?> table = getTable.get();
readAndCheck(table, -1);
// the primary region is fine and the primary timeout is 1 second which is long enough, so we
// should not send any requests to secondary replicas even if the consistency is timeline.
Thread.sleep(5000);
assertEquals(0, getSecondaryGetCount());
}
@Test
public void testReplicaRead() throws Exception {
// fail the primary get request
FAIL_PRIMARY_GET = true;
REPLICA_ID_TO_COUNT.clear();
// make sure that we could still get the value from secondary replicas
AsyncTable<?> table = getTable.get();
readAndCheck(table, -1);
// make sure that the primary request has been canceled
Thread.sleep(5000);
int count = getPrimaryGetCount();
Thread.sleep(10000);
assertEquals(count, getPrimaryGetCount());
}
@Test
public void testReadSpecificReplica() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
AsyncTable<?> table = getTable.get();
for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
readAndCheck(table, replicaId);
assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
}
}
}

View File

@ -18,211 +18,38 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableRegionReplicasGet {
public class TestAsyncTableRegionReplicasGet extends AbstractTestAsyncTableRegionReplicasRead {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] VALUE = Bytes.toBytes("value");
private static int REPLICA_COUNT = 3;
private static AsyncConnection ASYNC_CONN;
@Rule
public TestName testName = new TestName();
@Parameter
public Supplier<AsyncTable<?>> getTable;
private static AsyncTable<?> getRawTable() {
return ASYNC_CONN.getTable(TABLE_NAME);
}
private static AsyncTable<?> getTable() {
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
}
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
}
private static volatile boolean FAIL_PRIMARY_GET = false;
private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
new ConcurrentHashMap<>();
public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
RegionInfo region = c.getEnvironment().getRegionInfo();
if (!region.getTable().equals(TABLE_NAME)) {
return;
}
REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
.incrementAndGet();
if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
throw new IOException("Inject error");
}
}
}
private static boolean allReplicasHaveRow() throws IOException {
for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
if (region.get(new Get(ROW), false).isEmpty()) {
return false;
}
}
}
return true;
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// 10 mins
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
TimeUnit.MINUTES.toMillis(10));
// 1 second
TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
TimeUnit.SECONDS.toMicros(1));
// set a small pause so we will retry very quickly
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
// infinite retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
.setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
startClusterAndCreateTable();
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// this is the fastest way to let all replicas have the row
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
waitUntilAllReplicasHaveRow(ROW);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
private static int getSecondaryGetCount() {
return REPLICA_ID_TO_COUNT.entrySet().stream()
.filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
.mapToInt(e -> e.getValue().get()).sum();
}
private static int getPrimaryGetCount() {
AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
return primaryGetCount != null ? primaryGetCount.get() : 0;
}
@Test
public void testNoReplicaRead() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
AsyncTable<?> table = getTable.get();
@Override
protected void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception {
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
for (int i = 0; i < 1000; i++) {
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
}
// the primary region is fine and the primary timeout is 1 second which is long enough, so we
// should not send any requests to secondary replicas even if the consistency is timeline.
Thread.sleep(5000);
assertEquals(0, getSecondaryGetCount());
}
@Test
public void testReplicaRead() throws Exception {
// fail the primary get request
FAIL_PRIMARY_GET = true;
REPLICA_ID_TO_COUNT.clear();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
// make sure that we could still get the value from secondary replicas
AsyncTable<?> table = getTable.get();
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
// make sure that the primary request has been canceled
Thread.sleep(5000);
int count = getPrimaryGetCount();
Thread.sleep(10000);
assertEquals(count, getPrimaryGetCount());
}
@Test
public void testReadSpecificReplica() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
AsyncTable<?> table = getTable.get();
for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
if (replicaId >= 0) {
get.setReplicaId(replicaId);
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
}
assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableRegionReplicasScan extends AbstractTestAsyncTableRegionReplicasRead {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasScan.class);
private static int ROW_COUNT = 1000;
private static byte[] getRow(int i) {
return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(ROW), i));
}
private static byte[] getValue(int i) {
return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(VALUE), i));
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
startClusterAndCreateTable();
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
for (int i = 0; i < ROW_COUNT; i++) {
table.put(new Put(getRow(i)).addColumn(FAMILY, QUALIFIER, getValue(i))).get();
}
waitUntilAllReplicasHaveRow(getRow(ROW_COUNT - 1));
}
@Override
protected void readAndCheck(AsyncTable<?> table, int replicaId) throws IOException {
Scan scan = new Scan().setConsistency(Consistency.TIMELINE).setCaching(1);
if (replicaId >= 0) {
scan.setReplicaId(replicaId);
}
try (ResultScanner scanner = table.getScanner(scan)) {
for (int i = 0; i < 1000; i++) {
Result result = scanner.next();
assertNotNull(result);
assertArrayEquals(getValue(i), result.getValue(FAMILY, QUALIFIER));
}
}
}
}