HBASE-17372 Make AsyncTable thread safe

This commit is contained in:
zhangduo 2017-01-17 09:55:23 +08:00
parent 4cb09a494c
commit 4ab95ebbce
22 changed files with 587 additions and 317 deletions

View File

@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
@ -40,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -61,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -102,9 +99,7 @@ class AsyncBatchRpcRetryingCaller<T> {
private final long operationTimeoutNs; private final long operationTimeoutNs;
private final long readRpcTimeoutNs; private final long rpcTimeoutNs;
private final long writeRpcTimeoutNs;
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
@ -128,39 +123,22 @@ class AsyncBatchRpcRetryingCaller<T> {
public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
public final AtomicLong rpcTimeoutNs; public void addAction(HRegionLocation loc, Action action) {
public ServerRequest(long defaultRpcTimeoutNs) {
this.rpcTimeoutNs = new AtomicLong(defaultRpcTimeoutNs);
}
public void addAction(HRegionLocation loc, Action action, long rpcTimeoutNs) {
computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(), computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
() -> new RegionRequest(loc)).actions.add(action); () -> new RegionRequest(loc)).actions.add(action);
// try update the timeout to a larger value
if (this.rpcTimeoutNs.get() <= 0) {
return;
}
if (rpcTimeoutNs <= 0) {
this.rpcTimeoutNs.set(-1L);
return;
}
AtomicUtils.updateMax(this.rpcTimeoutNs, rpcTimeoutNs);
} }
} }
public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, int maxRetries, TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
long operationTimeoutNs, long readRpcTimeoutNs, long writeRpcTimeoutNs, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.conn = conn; this.conn = conn;
this.tableName = tableName; this.tableName = tableName;
this.pauseNs = pauseNs; this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries); this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs; this.operationTimeoutNs = operationTimeoutNs;
this.readRpcTimeoutNs = readRpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
this.writeRpcTimeoutNs = writeRpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt; this.startLogErrorsCnt = startLogErrorsCnt;
this.actions = new ArrayList<>(actions.size()); this.actions = new ArrayList<>(actions.size());
@ -366,7 +344,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return; return;
} }
HBaseRpcController controller = conn.rpcControllerFactory.newController(); HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(serverReq.rpcTimeoutNs.get(), remainingNs)); resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
if (!cells.isEmpty()) { if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells)); controller.setCellScanner(createCellScanner(cells));
} }
@ -416,10 +394,6 @@ class AsyncBatchRpcRetryingCaller<T> {
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
} }
private long getRpcTimeoutNs(Action action) {
return action.getAction() instanceof Get ? readRpcTimeoutNs : writeRpcTimeoutNs;
}
private void groupAndSend(Stream<Action> actions, int tries) { private void groupAndSend(Stream<Action> actions, int tries) {
long locateTimeoutNs; long locateTimeoutNs;
if (operationTimeoutNs > 0) { if (operationTimeoutNs > 0) {
@ -433,15 +407,6 @@ class AsyncBatchRpcRetryingCaller<T> {
} }
ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
// use the small one as the default timeout value, and increase the timeout value if we have an
// action in the group needs a larger timeout value.
long defaultRpcTimeoutNs;
if (readRpcTimeoutNs > 0) {
defaultRpcTimeoutNs =
writeRpcTimeoutNs > 0 ? Math.min(readRpcTimeoutNs, writeRpcTimeoutNs) : readRpcTimeoutNs;
} else {
defaultRpcTimeoutNs = writeRpcTimeoutNs > 0 ? writeRpcTimeoutNs : -1L;
}
CompletableFuture.allOf(actions CompletableFuture.allOf(actions
.map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
@ -454,9 +419,8 @@ class AsyncBatchRpcRetryingCaller<T> {
addError(action, error, null); addError(action, error, null);
locateFailed.add(action); locateFailed.add(action);
} else { } else {
computeIfAbsent(actionsByServer, loc.getServerName(), computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
() -> new ServerRequest(defaultRpcTimeoutNs)).addAction(loc, action, .addAction(loc, action);
getRpcTimeoutNs(action));
} }
})) }))
.toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {

View File

@ -55,14 +55,21 @@ class AsyncClientScanner {
private final AsyncConnectionImpl conn; private final AsyncConnectionImpl conn;
private final long pauseNs;
private final int maxAttempts;
private final long scanTimeoutNs; private final long scanTimeoutNs;
private final long rpcTimeoutNs; private final long rpcTimeoutNs;
private final int startLogErrorsCnt;
private final ScanResultCache resultCache; private final ScanResultCache resultCache;
public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName, public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) { if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
} }
@ -73,8 +80,11 @@ class AsyncClientScanner {
this.consumer = consumer; this.consumer = consumer;
this.tableName = tableName; this.tableName = tableName;
this.conn = conn; this.conn = conn;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs; this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0 this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
? new AllowPartialScanResultCache() : new CompleteScanResultCache(); ? new AllowPartialScanResultCache() : new CompleteScanResultCache();
} }
@ -117,7 +127,9 @@ class AsyncClientScanner {
conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
.setScan(scan).consumer(consumer).resultCache(resultCache) .setScan(scan).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start().whenComplete((hasMore, error) -> { .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start()
.whenComplete((hasMore, error) -> {
if (error != null) { if (error != null) {
consumer.onError(error); consumer.onError(error);
return; return;
@ -133,8 +145,9 @@ class AsyncClientScanner {
private void openScanner() { private void openScanner() {
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow()) conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.whenComplete((resp, error) -> { .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call().whenComplete((resp, error) -> {
if (error != null) { if (error != null) {
consumer.onError(error); consumer.onError(error);
return; return;

View File

@ -50,21 +50,32 @@ public interface AsyncConnection extends Closeable {
AsyncTableRegionLocator getRegionLocator(TableName tableName); AsyncTableRegionLocator getRegionLocator(TableName tableName);
/** /**
* Retrieve an RawAsyncTable implementation for accessing a table. The returned Table is not * Retrieve an {@link RawAsyncTable} implementation for accessing a table.
* thread safe, a new instance should be created for each using thread. This is a lightweight * <p>
* operation, pooling or caching of the returned AsyncTable is neither required nor desired. * The returned instance will use default configs. Use {@link #getRawTableBuilder(TableName)} if you
* want to customize some configs.
* <p> * <p>
* This method no longer checks table existence. An exception will be thrown if the table does not * This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted. * exist only when the first operation is attempted.
* @param tableName the name of the table * @param tableName the name of the table
* @return an RawAsyncTable to use for interactions with this table * @return an RawAsyncTable to use for interactions with this table
* @see #getRawTableBuilder(TableName)
*/ */
RawAsyncTable getRawTable(TableName tableName); default RawAsyncTable getRawTable(TableName tableName) {
return getRawTableBuilder(tableName).build();
}
/** /**
* Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread * Returns an {@link AsyncTableBuilder} for creating {@link RawAsyncTable}.
* safe, a new instance should be created for each using thread. This is a lightweight operation, * <p>
* pooling or caching of the returned AsyncTable is neither required nor desired. * This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted.
* @param tableName the name of the table
*/
AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName);
/**
* Retrieve an AsyncTable implementation for accessing a table.
* <p> * <p>
* This method no longer checks table existence. An exception will be thrown if the table does not * This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted. * exist only when the first operation is attempted.
@ -72,5 +83,17 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback * @param pool the thread pool to use for executing callback
* @return an AsyncTable to use for interactions with this table * @return an AsyncTable to use for interactions with this table
*/ */
AsyncTable getTable(TableName tableName, ExecutorService pool); default AsyncTable getTable(TableName tableName, ExecutorService pool) {
return getTableBuilder(tableName, pool).build();
}
/**
* Returns an {@link AsyncTableBuilder} for creating {@link AsyncTable}.
* <p>
* This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted.
* @param tableName the name of the table
* @param pool the thread pool to use for executing callback
*/
AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
} }

View File

@ -56,6 +56,10 @@ class AsyncConnectionConfiguration {
// by this value, see scanTimeoutNs. // by this value, see scanTimeoutNs.
private final long operationTimeoutNs; private final long operationTimeoutNs;
// timeout for each rpc request. Can be overridden by a more specific config, such as
// readRpcTimeout or writeRpcTimeout.
private final long rpcTimeoutNs;
// timeout for each read rpc request // timeout for each read rpc request
private final long readRpcTimeoutNs; private final long readRpcTimeoutNs;
@ -85,10 +89,12 @@ class AsyncConnectionConfiguration {
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, this.rpcTimeoutNs = TimeUnit.MILLISECONDS
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, this.readRpcTimeoutNs =
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
this.writeRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
this.pauseNs = this.pauseNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@ -111,6 +117,10 @@ class AsyncConnectionConfiguration {
return operationTimeoutNs; return operationTimeoutNs;
} }
long getRpcTimeoutNs() {
return rpcTimeoutNs;
}
long getReadRpcTimeoutNs() { long getReadRpcTimeoutNs() {
return readRpcTimeoutNs; return readRpcTimeoutNs;
} }

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT; import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
@ -90,7 +88,6 @@ class AsyncConnectionImpl implements AsyncConnection {
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
@SuppressWarnings("deprecation")
public AsyncConnectionImpl(Configuration conf, User user) { public AsyncConnectionImpl(Configuration conf, User user) {
this.conf = conf; this.conf = conf;
this.user = user; this.user = user;
@ -105,7 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection {
this.rpcClient = RpcClientFactory.createClient(conf, clusterId); this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE,
TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
this.locator = new AsyncRegionLocator(this, RETRY_TIMER); this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
@ -152,12 +150,25 @@ class AsyncConnectionImpl implements AsyncConnection {
} }
@Override @Override
public RawAsyncTable getRawTable(TableName tableName) { public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) {
return new RawAsyncTableImpl(this, tableName); return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) {
@Override
public RawAsyncTable build() {
return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
}
};
} }
@Override @Override
public AsyncTable getTable(TableName tableName, ExecutorService pool) { public AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool) {
return new AsyncTableImpl(this, tableName, pool); return new AsyncTableBuilderBase<AsyncTable>(tableName, connConf) {
@Override
public AsyncTable build() {
RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
}
};
} }
} }

View File

@ -71,7 +71,7 @@ class AsyncRegionLocator {
future.completeExceptionally(new TimeoutIOException(timeoutMsg.get())); future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
}, timeoutNs, TimeUnit.NANOSECONDS); }, timeoutNs, TimeUnit.NANOSECONDS);
return future.whenComplete((loc, error) -> { return future.whenComplete((loc, error) -> {
if (error.getClass() != TimeoutIOException.class) { if (error != null && error.getClass() != TimeoutIOException.class) {
// cancel timeout task if we are not completed by it. // cancel timeout task if we are not completed by it.
timeoutTask.cancel(); timeoutTask.cancel();
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
@ -46,7 +47,16 @@ class AsyncRpcRetryingCallerFactory {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
} }
public class SingleRequestCallerBuilder<T> { private abstract class BuilderBase {
protected long pauseNs = conn.connConf.getPauseNs();
protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
}
public class SingleRequestCallerBuilder<T> extends BuilderBase {
private TableName tableName; private TableName tableName;
@ -91,12 +101,26 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
public AsyncSingleRequestRpcRetryingCaller<T> build() { public AsyncSingleRequestRpcRetryingCaller<T> build() {
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
} }
/** /**
@ -114,7 +138,7 @@ class AsyncRpcRetryingCallerFactory {
return new SingleRequestCallerBuilder<>(); return new SingleRequestCallerBuilder<>();
} }
public class SmallScanCallerBuilder { public class SmallScanCallerBuilder extends BuilderBase {
private TableName tableName; private TableName tableName;
@ -151,12 +175,27 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public SmallScanCallerBuilder pause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
public SmallScanCallerBuilder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
public AsyncSmallScanRpcRetryingCaller build() { public AsyncSmallScanRpcRetryingCaller build() {
TableName tableName = checkNotNull(this.tableName, "tableName is null"); TableName tableName = checkNotNull(this.tableName, "tableName is null");
Scan scan = checkNotNull(this.scan, "scan is null"); Scan scan = checkNotNull(this.scan, "scan is null");
checkArgument(limit > 0, "invalid limit %d", limit); checkArgument(limit > 0, "invalid limit %d", limit);
return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs, return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts,
rpcTimeoutNs); scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
} }
/** /**
@ -174,7 +213,7 @@ class AsyncRpcRetryingCallerFactory {
return new SmallScanCallerBuilder(); return new SmallScanCallerBuilder();
} }
public class ScanSingleRegionCallerBuilder { public class ScanSingleRegionCallerBuilder extends BuilderBase {
private long scannerId = -1L; private long scannerId = -1L;
@ -232,15 +271,29 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
public AsyncScanSingleRegionRpcRetryingCaller build() { public AsyncScanSingleRegionRpcRetryingCaller build() {
checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId); checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
checkNotNull(scan, "scan is null"), scannerId, checkNotNull(scan, "scan is null"), scannerId,
checkNotNull(resultCache, "resultCache is null"), checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(), checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
conn.connConf.getStartLogErrorsCnt());
} }
/** /**
@ -258,7 +311,7 @@ class AsyncRpcRetryingCallerFactory {
return new ScanSingleRegionCallerBuilder(); return new ScanSingleRegionCallerBuilder();
} }
public class BatchCallerBuilder { public class BatchCallerBuilder extends BuilderBase {
private TableName tableName; private TableName tableName;
@ -266,9 +319,7 @@ class AsyncRpcRetryingCallerFactory {
private long operationTimeoutNs = -1L; private long operationTimeoutNs = -1L;
private long readRpcTimeoutNs = -1L; private long rpcTimeoutNs = -1L;
private long writeRpcTimeoutNs = -1L;
public BatchCallerBuilder table(TableName tableName) { public BatchCallerBuilder table(TableName tableName) {
this.tableName = tableName; this.tableName = tableName;
@ -285,20 +336,29 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public BatchCallerBuilder readRpcTimeout(long rpcTimeout, TimeUnit unit) { public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(rpcTimeout); this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
return this; return this;
} }
public BatchCallerBuilder writeRpcTimeout(long rpcTimeout, TimeUnit unit) { public BatchCallerBuilder pause(long pause, TimeUnit unit) {
this.writeRpcTimeoutNs = unit.toNanos(rpcTimeout); this.pauseNs = unit.toNanos(pause);
return this;
}
public BatchCallerBuilder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this; return this;
} }
public <T> AsyncBatchRpcRetryingCaller<T> build() { public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions, return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions, pauseNs,
conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
readRpcTimeoutNs, writeRpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
} }
public <T> List<CompletableFuture<T>> call() { public <T> List<CompletableFuture<T>> call() {

View File

@ -108,7 +108,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache, AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs, RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.scan = scan; this.scan = scan;
this.scannerId = scannerId; this.scannerId = scannerId;
@ -117,7 +117,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.stub = stub; this.stub = stub;
this.loc = loc; this.loc = loc;
this.pauseNs = pauseNs; this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries); this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs; this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt; this.startLogErrorsCnt = startLogErrorsCnt;

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
@ -90,7 +89,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable, TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) { int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.conn = conn; this.conn = conn;
@ -99,7 +98,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
this.locateType = locateType; this.locateType = locateType;
this.callable = callable; this.callable = callable;
this.pauseNs = pauseNs; this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries); this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs; this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt; this.startLogErrorsCnt = startLogErrorsCnt;

View File

@ -57,6 +57,12 @@ class AsyncSmallScanRpcRetryingCaller {
private final long rpcTimeoutNs; private final long rpcTimeoutNs;
private final long pauseNs;
private final int maxAttempts;
private final int startLogErrosCnt;
private final Function<HRegionInfo, Boolean> nextScan; private final Function<HRegionInfo, Boolean> nextScan;
private final List<Result> resultList; private final List<Result> resultList;
@ -64,13 +70,17 @@ class AsyncSmallScanRpcRetryingCaller {
private final CompletableFuture<List<Result>> future; private final CompletableFuture<List<Result>> future;
public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan, public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
int limit, long scanTimeoutNs, long rpcTimeoutNs) { int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrosCnt) {
this.conn = conn; this.conn = conn;
this.tableName = tableName; this.tableName = tableName;
this.scan = scan; this.scan = scan;
this.limit = limit; this.limit = limit;
this.scanTimeoutNs = scanTimeoutNs; this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.startLogErrosCnt = startLogErrosCnt;
if (scan.isReversed()) { if (scan.isReversed()) {
this.nextScan = this::reversedNextScan; this.nextScan = this::reversedNextScan;
} else { } else {
@ -146,7 +156,8 @@ class AsyncSmallScanRpcRetryingCaller {
private void scan() { private void scan() {
conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow()) conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::scan).call() .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call()
.whenComplete((resp, error) -> { .whenComplete((resp, error) -> {
if (error != null) { if (error != null) {
future.completeExceptionally(error); future.completeExceptionally(error);

View File

@ -23,9 +23,11 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
/** /**
* The asynchronous table for normal users. * The asynchronous table for normal users.
* <p> * <p>
* The implementation is required to be thread safe.
* <p>
* The implementation should make sure that user can do everything they want to the returned * The implementation should make sure that user can do everything they want to the returned
* {@code CompletableFuture} without break anything. Usually the implementation will require user to * {@code CompletableFuture} without breaking anything. Usually the implementation will require user
* provide a {@code ExecutorService}. * to provide a {@code ExecutorService}.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable

View File

@ -18,9 +18,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatch;
import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatchAll;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -39,10 +38,9 @@ import org.apache.hadoop.hbase.util.Bytes;
* The base interface for asynchronous version of Table. Obtain an instance from a * The base interface for asynchronous version of Table. Obtain an instance from a
* {@link AsyncConnection}. * {@link AsyncConnection}.
* <p> * <p>
* The implementation is NOT required to be thread safe. Do NOT access it from multiple threads * The implementation is required to be thread safe.
* concurrently.
* <p> * <p>
* Usually the implementations will not throw any exception directly, you need to get the exception * Usually the implementation will not throw any exception directly. You need to get the exception
* from the returned {@link CompletableFuture}. * from the returned {@link CompletableFuture}.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@ -62,60 +60,31 @@ public interface AsyncTableBase {
Configuration getConfiguration(); Configuration getConfiguration();
/** /**
* Set timeout of each rpc read request in operations of this Table instance, will override the * Get timeout of each rpc request in this Table instance. It will be overridden by a more
* value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
* long, it will stop waiting and send a new request to retry until retries exhausted or operation * @see #getReadRpcTimeout(TimeUnit)
* timeout reached. * @see #getWriteRpcTimeout(TimeUnit)
*/ */
void setReadRpcTimeout(long timeout, TimeUnit unit); long getRpcTimeout(TimeUnit unit);
/** /**
* Get timeout of each rpc read request in this Table instance. * Get timeout of each rpc read request in this Table instance.
*/ */
long getReadRpcTimeout(TimeUnit unit); long getReadRpcTimeout(TimeUnit unit);
/**
* Set timeout of each rpc write request in operations of this Table instance, will override the
* value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
* long, it will stop waiting and send a new request to retry until retries exhausted or operation
* timeout reached.
*/
void setWriteRpcTimeout(long timeout, TimeUnit unit);
/** /**
* Get timeout of each rpc write request in this Table instance. * Get timeout of each rpc write request in this Table instance.
*/ */
long getWriteRpcTimeout(TimeUnit unit); long getWriteRpcTimeout(TimeUnit unit);
/**
* Set timeout of each operation in this Table instance, will override the value of
* {@code hbase.client.operation.timeout} in configuration.
* <p>
* Operation timeout is a top-level restriction that makes sure an operation will not be blocked
* more than this. In each operation, if rpc request fails because of timeout or other reason, it
* will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
* reach the operation timeout before retries exhausted, it will break early and throw
* SocketTimeoutException.
*/
void setOperationTimeout(long timeout, TimeUnit unit);
/** /**
* Get timeout of each operation in Table instance. * Get timeout of each operation in Table instance.
*/ */
long getOperationTimeout(TimeUnit unit); long getOperationTimeout(TimeUnit unit);
/** /**
* Set timeout of a single operation in a scan, such as openScanner and next. Will override the * Get the timeout of a single operation in a scan. It works like operation timeout for other
* value {@code hbase.client.scanner.timeout.period} in configuration. * operations.
* <p>
* Generally a scan will never timeout after we add heartbeat support unless the region is
* crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
* operation in a scan.
*/
void setScanTimeout(long timeout, TimeUnit unit);
/**
* Get the timeout of a single operation in a scan.
*/ */
long getScanTimeout(TimeUnit unit); long getScanTimeout(TimeUnit unit);
@ -352,29 +321,6 @@ public interface AsyncTableBase {
*/ */
CompletableFuture<List<Result>> smallScan(Scan scan, int limit); CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
/**
* Extracts certain cells from the given rows, in batch.
* <p>
* Notice that you may not get all the results with this function, which means some of the
* returned {@link CompletableFuture}s may succeed while some of the other returned
* {@link CompletableFuture}s may fail.
* @param gets The objects that specify what data to fetch and from which rows.
* @return A list of {@link CompletableFuture}s that represent the result for each get.
*/
default List<CompletableFuture<Result>> get(List<Get> gets) {
return batch(gets);
}
/**
* A simple version for batch get. It will fail if there are any failures and you will get the
* whole result list at once if the operation is succeeded.
* @param gets The objects that specify what data to fetch and from which rows.
* @return A {@link CompletableFuture} that wrapper the result list.
*/
default CompletableFuture<List<Result>> getAll(List<Get> gets) {
return batchAll(gets);
}
/** /**
* Test for the existence of columns in the table, as specified by the Gets. * Test for the existence of columns in the table, as specified by the Gets.
* <p> * <p>
@ -386,8 +332,8 @@ public interface AsyncTableBase {
* @return A list of {@link CompletableFuture}s that represent the existence for each get. * @return A list of {@link CompletableFuture}s that represent the existence for each get.
*/ */
default List<CompletableFuture<Boolean>> exists(List<Get> gets) { default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
return get(toCheckExistenceOnly(gets)).stream(). return get(toCheckExistenceOnly(gets)).stream()
<CompletableFuture<Boolean>>map(f -> f.thenApply(r -> r.getExists())).collect(toList()); .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
} }
/** /**
@ -397,8 +343,28 @@ public interface AsyncTableBase {
* @return A {@link CompletableFuture} that wrapper the result boolean list. * @return A {@link CompletableFuture} that wrapper the result boolean list.
*/ */
default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
return getAll(toCheckExistenceOnly(gets)) return allOf(exists(gets));
.thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList())); }
/**
* Extracts certain cells from the given rows, in batch.
* <p>
* Notice that you may not get all the results with this function, which means some of the
* returned {@link CompletableFuture}s may succeed while some of the other returned
* {@link CompletableFuture}s may fail.
* @param gets The objects that specify what data to fetch and from which rows.
* @return A list of {@link CompletableFuture}s that represent the result for each get.
*/
List<CompletableFuture<Result>> get(List<Get> gets);
/**
* A simple version for batch get. It will fail if there are any failures and you will get the
* whole result list at once if the operation is succeeded.
* @param gets The objects that specify what data to fetch and from which rows.
* @return A {@link CompletableFuture} that wrapper the result list.
*/
default CompletableFuture<List<Result>> getAll(List<Get> gets) {
return allOf(get(gets));
} }
/** /**
@ -406,9 +372,7 @@ public interface AsyncTableBase {
* @param puts The list of mutations to apply. * @param puts The list of mutations to apply.
* @return A list of {@link CompletableFuture}s that represent the result for each put. * @return A list of {@link CompletableFuture}s that represent the result for each put.
*/ */
default List<CompletableFuture<Void>> put(List<Put> puts) { List<CompletableFuture<Void>> put(List<Put> puts);
return voidBatch(this, puts);
}
/** /**
* A simple version of batch put. It will fail if there are any failures. * A simple version of batch put. It will fail if there are any failures.
@ -416,7 +380,7 @@ public interface AsyncTableBase {
* @return A {@link CompletableFuture} that always returns null when complete normally. * @return A {@link CompletableFuture} that always returns null when complete normally.
*/ */
default CompletableFuture<Void> putAll(List<Put> puts) { default CompletableFuture<Void> putAll(List<Put> puts) {
return voidBatchAll(this, puts); return allOf(put(puts)).thenApply(r -> null);
} }
/** /**
@ -424,9 +388,7 @@ public interface AsyncTableBase {
* @param deletes list of things to delete. * @param deletes list of things to delete.
* @return A list of {@link CompletableFuture}s that represent the result for each delete. * @return A list of {@link CompletableFuture}s that represent the result for each delete.
*/ */
default List<CompletableFuture<Void>> delete(List<Delete> deletes) { List<CompletableFuture<Void>> delete(List<Delete> deletes);
return voidBatch(this, deletes);
}
/** /**
* A simple version of batch delete. It will fail if there are any failures. * A simple version of batch delete. It will fail if there are any failures.
@ -434,7 +396,7 @@ public interface AsyncTableBase {
* @return A {@link CompletableFuture} that always returns null when complete normally. * @return A {@link CompletableFuture} that always returns null when complete normally.
*/ */
default CompletableFuture<Void> deleteAll(List<Delete> deletes) { default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
return voidBatchAll(this, deletes); return allOf(delete(deletes)).thenApply(r -> null);
} }
/** /**
@ -454,8 +416,6 @@ public interface AsyncTableBase {
* @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
*/ */
default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
List<CompletableFuture<T>> futures = batch(actions); return allOf(batch(actions));
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
} }
} }

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* For creating {@link AsyncTable} or {@link RawAsyncTable}.
* <p>
* The implementation should have default configurations set before returning the builder to user.
* So users are free to only set the configs they care about to create a new
* AsyncTable/RawAsyncTable instance.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface AsyncTableBuilder<T extends AsyncTableBase> {
/**
* Set timeout for a whole operation such as get, put or delete. Notice that scan will not be
* effected by this value, see scanTimeoutNs.
* <p>
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
* @see #setMaxAttempts(int)
* @see #setMaxRetries(int)
* @see #setScanTimeout(long, TimeUnit)
*/
AsyncTableBuilder<T> setOperationTimeout(long timeout, TimeUnit unit);
/**
* As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
* crash. The RS will always return something before the rpc timed out or scan timed out to tell
* the client that it is still alive. The scan timeout is used as operation timeout for every
* operation in a scan, such as openScanner or next.
* @see #setScanTimeout(long, TimeUnit)
*/
AsyncTableBuilder<T> setScanTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each rpc request.
* <p>
* Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request
* and write request(put, delete).
*/
AsyncTableBuilder<T> setRpcTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each read(get, scan) rpc request.
*/
AsyncTableBuilder<T> setReadRpcTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each write(put, delete) rpc request.
*/
AsyncTableBuilder<T> setWriteRpcTimeout(long timeout, TimeUnit unit);
/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
*/
AsyncTableBuilder<T> setRetryPause(long pause, TimeUnit unit);
/**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
* <p>
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
* @see #setMaxAttempts(int)
* @see #setOperationTimeout(long, TimeUnit)
*/
default AsyncTableBuilder<T> setMaxRetries(int maxRetries) {
return setMaxAttempts(retries2Attempts(maxRetries));
}
/**
* Set the max attempt times for an operation. Usually it is the max retry times plus 1. Operation
* timeout and max attempt times(or max retry times) are both limitations for retrying, we will
* stop retrying when we reach any of the limitations.
* @see #setMaxRetries(int)
* @see #setOperationTimeout(long, TimeUnit)
*/
AsyncTableBuilder<T> setMaxAttempts(int maxAttempts);
/**
* Set the number of retries that are allowed before we start to log.
*/
AsyncTableBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt);
/**
* Create the {@link AsyncTable} or {@link RawAsyncTable} instance.
*/
T build();
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Base class for all asynchronous table builders.
*/
@InterfaceAudience.Private
abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncTableBuilder<T> {
protected TableName tableName;
protected long operationTimeoutNs;
protected long scanTimeoutNs;
protected long rpcTimeoutNs;
protected long readRpcTimeoutNs;
protected long writeRpcTimeoutNs;
protected long pauseNs;
protected int maxAttempts;
protected int startLogErrorsCnt;
AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) {
this.tableName = tableName;
this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs()
: connConf.getOperationTimeoutNs();
this.scanTimeoutNs = connConf.getScanTimeoutNs();
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@Override
public AsyncTableBuilderBase<T> setOperationTimeout(long timeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncTableBuilderBase<T> setScanTimeout(long timeout, TimeUnit unit) {
this.scanTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncTableBuilderBase<T> setRpcTimeout(long timeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncTableBuilderBase<T> setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncTableBuilderBase<T> setWriteRpcTimeout(long timeout, TimeUnit unit) {
this.writeRpcTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncTableBuilderBase<T> setRetryPause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
@Override
public AsyncTableBuilderBase<T> setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
@Override
public AsyncTableBuilderBase<T> setStartLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
}

View File

@ -22,7 +22,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import static java.util.stream.Collectors.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -42,8 +42,8 @@ class AsyncTableImpl implements AsyncTable {
private final long defaultScannerMaxResultSize; private final long defaultScannerMaxResultSize;
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) { AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) {
this.rawTable = conn.getRawTable(tableName); this.rawTable = rawTable;
this.pool = pool; this.pool = pool;
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
} }
@ -59,8 +59,8 @@ class AsyncTableImpl implements AsyncTable {
} }
@Override @Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) { public long getRpcTimeout(TimeUnit unit) {
rawTable.setReadRpcTimeout(timeout, unit); return rawTable.getRpcTimeout(unit);
} }
@Override @Override
@ -68,31 +68,16 @@ class AsyncTableImpl implements AsyncTable {
return rawTable.getReadRpcTimeout(unit); return rawTable.getReadRpcTimeout(unit);
} }
@Override
public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
rawTable.setWriteRpcTimeout(timeout, unit);
}
@Override @Override
public long getWriteRpcTimeout(TimeUnit unit) { public long getWriteRpcTimeout(TimeUnit unit) {
return rawTable.getWriteRpcTimeout(unit); return rawTable.getWriteRpcTimeout(unit);
} }
@Override
public void setOperationTimeout(long timeout, TimeUnit unit) {
rawTable.setOperationTimeout(timeout, unit);
}
@Override @Override
public long getOperationTimeout(TimeUnit unit) { public long getOperationTimeout(TimeUnit unit) {
return rawTable.getOperationTimeout(unit); return rawTable.getOperationTimeout(unit);
} }
@Override
public void setScanTimeout(long timeout, TimeUnit unit) {
rawTable.setScanTimeout(timeout, unit);
}
@Override @Override
public long getScanTimeout(TimeUnit unit) { public long getScanTimeout(TimeUnit unit) {
return rawTable.getScanTimeout(unit); return rawTable.getScanTimeout(unit);
@ -193,8 +178,23 @@ class AsyncTableImpl implements AsyncTable {
pool.execute(() -> scan0(scan, consumer)); pool.execute(() -> scan0(scan, consumer));
} }
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
return rawTable.get(gets).stream().map(this::wrap).collect(toList());
}
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
return rawTable.put(puts).stream().map(this::wrap).collect(toList());
}
@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
}
@Override @Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return rawTable.<T> batch(actions).stream().map(this::wrap).collect(Collectors.toList()); return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
} }
} }

View File

@ -342,16 +342,6 @@ public final class ConnectionUtils {
return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList());
} }
static List<CompletableFuture<Void>> voidBatch(AsyncTableBase table,
List<? extends Row> actions) {
return table.<Object> batch(actions).stream().map(f -> f.<Void> thenApply(r -> null))
.collect(toList());
}
static CompletableFuture<Void> voidBatchAll(AsyncTableBase table, List<? extends Row> actions) {
return table.<Object> batchAll(actions).thenApply(r -> null);
}
static RegionLocateType getLocateType(Scan scan) { static RegionLocateType getLocateType(Scan scan) {
if (scan.isReversed()) { if (scan.isReversed()) {
if (isEmptyStartRow(scan.getStartRow())) { if (isEmptyStartRow(scan.getStartRow())) {
@ -389,4 +379,9 @@ public final class ConnectionUtils {
// the region. // the region.
return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
} }
static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
}
} }

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
/** /**
* A low level asynchronous table. * A low level asynchronous table.
* <p> * <p>
* The implementation is required to be thread safe.
* <p>
* The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback * The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback
* thread, so typically you should not do any time consuming work inside these methods, otherwise * thread, so typically you should not do any time consuming work inside these methods, otherwise
* you will be likely to block at least one connection to RS(even more if the rpc framework uses * you will be likely to block at least one connection to RS(even more if the rpc framework uses

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import java.io.IOException; import java.io.IOException;
@ -67,24 +68,35 @@ class RawAsyncTableImpl implements RawAsyncTable {
private final long defaultScannerMaxResultSize; private final long defaultScannerMaxResultSize;
private long readRpcTimeoutNs; private final long rpcTimeoutNs;
private long writeRpcTimeoutNs; private final long readRpcTimeoutNs;
private long operationTimeoutNs; private final long writeRpcTimeoutNs;
private long scanTimeoutNs; private final long operationTimeoutNs;
public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { private final long scanTimeoutNs;
private final long pauseNs;
private final int maxAttempts;
private final int startLogErrorsCnt;
RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase<?> builder) {
this.conn = conn; this.conn = conn;
this.tableName = tableName; this.tableName = builder.tableName;
this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs(); this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
: conn.connConf.getOperationTimeoutNs(); this.operationTimeoutNs = builder.operationTimeoutNs;
this.scanTimeoutNs = builder.scanTimeoutNs;
this.pauseNs = builder.pauseNs;
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = conn.connConf.getScannerCaching(); this.defaultScannerCaching = conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
} }
@Override @Override
@ -178,7 +190,9 @@ class RawAsyncTableImpl implements RawAsyncTable {
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) { private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row) return conn.callerFactory.<T> single().table(tableName).row(row)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
} }
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) { private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
@ -214,7 +228,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override @Override
public CompletableFuture<Result> append(Append append) { public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append); checkHasFamilies(append);
return this.<Result> newCaller(append, writeRpcTimeoutNs) return this.<Result> newCaller(append, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub, .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call(); .call();
@ -223,7 +237,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override @Override
public CompletableFuture<Result> increment(Increment increment) { public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment); checkHasFamilies(increment);
return this.<Result> newCaller(increment, writeRpcTimeoutNs) return this.<Result> newCaller(increment, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc, .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call(); .call();
@ -232,7 +246,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override @Override
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put) { CompareOp compareOp, byte[] value, Put put) {
return this.<Boolean> newCaller(row, writeRpcTimeoutNs) return this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc, .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
stub, put, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@ -244,7 +258,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override @Override
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier, public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete) { CompareOp compareOp, byte[] value, Delete delete) {
return this.<Boolean> newCaller(row, writeRpcTimeoutNs) return this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller, .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@ -303,20 +317,18 @@ class RawAsyncTableImpl implements RawAsyncTable {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true); regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
}, (resp) -> { }, resp -> null)).call();
return null;
})).call();
} }
@Override @Override
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier, public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations mutation) { CompareOp compareOp, byte[] value, RowMutations mutation) {
return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs) return this.<Boolean> newCaller(mutation, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc, .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm), new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
(resp) -> resp.getExists())) resp -> resp.getExists()))
.call(); .call();
} }
@ -349,7 +361,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
} }
return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
.limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
} }
public void scan(Scan scan, RawScanResultConsumer consumer) { public void scan(Scan scan, RawScanResultConsumer consumer) {
@ -362,13 +375,44 @@ class RawAsyncTableImpl implements RawAsyncTable {
} }
} }
scan = setDefaultScanConfig(scan); scan = setDefaultScanConfig(scan);
new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) new AsyncClientScanner(scan, consumer, tableName, conn, pauseNs, maxAttempts, scanTimeoutNs,
.start(); readRpcTimeoutNs, startLogErrorsCnt).start();
} }
@Override @Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) { public List<CompletableFuture<Result>> get(List<Get> gets) {
this.readRpcTimeoutNs = unit.toNanos(timeout); return batch(gets, readRpcTimeoutNs);
}
@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
return voidMutate(puts);
}
@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
return voidMutate(deletes);
}
@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return batch(actions, rpcTimeoutNs);
}
private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
.map(f -> f.<Void> thenApply(r -> null)).collect(toList());
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
public long getRpcTimeout(TimeUnit unit) {
return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
} }
@Override @Override
@ -376,41 +420,18 @@ class RawAsyncTableImpl implements RawAsyncTable {
return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
} }
@Override
public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
this.writeRpcTimeoutNs = unit.toNanos(timeout);
}
@Override @Override
public long getWriteRpcTimeout(TimeUnit unit) { public long getWriteRpcTimeout(TimeUnit unit) {
return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
} }
@Override
public void setOperationTimeout(long timeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(timeout);
}
@Override @Override
public long getOperationTimeout(TimeUnit unit) { public long getOperationTimeout(TimeUnit unit) {
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
} }
@Override
public void setScanTimeout(long timeout, TimeUnit unit) {
this.scanTimeoutNs = unit.toNanos(timeout);
}
@Override @Override
public long getScanTimeout(TimeUnit unit) { public long getScanTimeout(TimeUnit unit) {
return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS)
.writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
} }
} }

View File

@ -875,10 +875,7 @@ public final class HConstants {
/** /**
* timeout for each RPC * timeout for each RPC
* @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY}
* instead.
*/ */
@Deprecated
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/** /**

View File

@ -30,16 +30,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -60,7 +58,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
private static byte[] VALUE = Bytes.toBytes("value"); private static byte[] VALUE = Bytes.toBytes("value");
private AsyncConnectionImpl asyncConn; private static AsyncConnectionImpl CONN;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -68,38 +66,24 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.getAdmin().setBalancerRunning(false, true); TEST_UTIL.getAdmin().setBalancerRunning(false, true);
TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME); TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
} }
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
IOUtils.closeQuietly(CONN);
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
@After
public void tearDown() {
if (asyncConn != null) {
asyncConn.close();
asyncConn = null;
}
}
private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt);
conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires);
asyncConn = new AsyncConnectionImpl(conf, User.getCurrent());
}
@Test @Test
public void testRegionMove() throws InterruptedException, ExecutionException, IOException { public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
initConn(0, 100, 30);
// This will leave a cached entry in location cache // This will leave a cached entry in location cache
HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName()); int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes( TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
RawAsyncTable table = asyncConn.getRawTable(TABLE_NAME); RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
.setMaxRetries(30).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// move back // move back
@ -117,9 +101,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@Test @Test
public void testMaxRetries() throws IOException, InterruptedException { public void testMaxRetries() throws IOException, InterruptedException {
initConn(0, 10, 2);
try { try {
asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
.maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
.action((controller, loc, stub) -> failedFuture()).call().get(); .action((controller, loc, stub) -> failedFuture()).call().get();
fail(); fail();
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -129,14 +113,14 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@Test @Test
public void testOperationTimeout() throws IOException, InterruptedException { public void testOperationTimeout() throws IOException, InterruptedException {
initConn(0, 100, Integer.MAX_VALUE);
long startNs = System.nanoTime(); long startNs = System.nanoTime();
try { try {
asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW) CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
.operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, stub) -> failedFuture()) .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
.call().get(); .action((controller, loc, stub) -> failedFuture()).call().get();
fail(); fail();
} catch (ExecutionException e) { } catch (ExecutionException e) {
e.printStackTrace();
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
} }
long costNs = System.nanoTime() - startNs; long costNs = System.nanoTime() - startNs;
@ -146,12 +130,11 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@Test @Test
public void testLocateError() throws IOException, InterruptedException, ExecutionException { public void testLocateError() throws IOException, InterruptedException, ExecutionException {
initConn(0, 100, 5);
AtomicBoolean errorTriggered = new AtomicBoolean(false); AtomicBoolean errorTriggered = new AtomicBoolean(false);
AtomicInteger count = new AtomicInteger(0); AtomicInteger count = new AtomicInteger(0);
HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
AsyncRegionLocator mockedLocator = AsyncRegionLocator mockedLocator =
new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) { new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
@Override @Override
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType locateType, long timeoutNs) { RegionLocateType locateType, long timeoutNs) {
@ -174,14 +157,15 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
} }
}; };
try (AsyncConnectionImpl mockedConn = try (AsyncConnectionImpl mockedConn =
new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) { new AsyncConnectionImpl(CONN.getConfiguration(), User.getCurrent()) {
@Override @Override
AsyncRegionLocator getLocator() { AsyncRegionLocator getLocator() {
return mockedLocator; return mockedLocator;
} }
}) { }) {
RawAsyncTable table = new RawAsyncTableImpl(mockedConn, TABLE_NAME); RawAsyncTable table = mockedConn.getRawTableBuilder(TABLE_NAME)
.setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(errorTriggered.get()); assertTrue(errorTriggered.get());
errorTriggered.set(false); errorTriggered.set(false);

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER; import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -33,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -72,6 +71,8 @@ public class TestAsyncTableGetMultiThreaded {
private static AsyncConnection CONN; private static AsyncConnection CONN;
private static RawAsyncTable TABLE;
private static byte[][] SPLIT_KEYS; private static byte[][] SPLIT_KEYS;
@BeforeClass @BeforeClass
@ -79,11 +80,10 @@ public class TestAsyncTableGetMultiThreaded {
setUp(HColumnDescriptor.MemoryCompaction.NONE); setUp(HColumnDescriptor.MemoryCompaction.NONE);
} }
protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception { protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction)
throws Exception {
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(memoryCompaction)); String.valueOf(memoryCompaction));
@ -96,8 +96,9 @@ public class TestAsyncTableGetMultiThreaded {
TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME); TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
CONN.getRawTable(TABLE_NAME) TABLE = CONN.getRawTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS)
.putAll( .setMaxRetries(1000).build();
TABLE.putAll(
IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
.get(); .get();
@ -112,11 +113,8 @@ public class TestAsyncTableGetMultiThreaded {
private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
while (!stop.get()) { while (!stop.get()) {
for (int i = 0; i < COUNT; i++) { for (int i = 0; i < COUNT; i++) {
assertEquals(i, assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i))))
Bytes.toInt( .get().getValue(FAMILY, QUALIFIER)));
CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
.get()
.getValue(FAMILY, QUALIFIER)));
} }
} }
} }

View File

@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -122,10 +121,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
@Override @Override
protected List<Result> doScan(Scan scan) throws Exception { protected List<Result> doScan(Scan scan) throws Exception {
SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer(); SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer();
RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer);
table.setScanTimeout(1, TimeUnit.HOURS);
table.setReadRpcTimeout(1, TimeUnit.HOURS);
table.scan(scan, scanConsumer);
List<Result> results = new ArrayList<>(); List<Result> results = new ArrayList<>();
for (Result result; (result = scanConsumer.take()) != null;) { for (Result result; (result = scanConsumer.take()) != null;) {
results.add(result); results.add(result);