HBASE-17141 Introduce a more user-friendly implementation of AsyncTable
This commit is contained in:
parent
b297f2dae1
commit
6ff19f94fe
|
@ -17,10 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
|
||||
|
@ -52,76 +48,4 @@ public abstract class AbstractClientScanner implements ResultScanner {
|
|||
public ScanMetrics getScanMetrics() {
|
||||
return scanMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get nbRows rows.
|
||||
* How many RPCs are made is determined by the {@link Scan#setCaching(int)}
|
||||
* setting (or hbase.client.scanner.caching in hbase-site.xml).
|
||||
* @param nbRows number of rows to return
|
||||
* @return Between zero and nbRows rowResults. Scan is done
|
||||
* if returned array is of zero-length (We never return null).
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public Result [] next(int nbRows) throws IOException {
|
||||
// Collect values to be returned here
|
||||
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
|
||||
for(int i = 0; i < nbRows; i++) {
|
||||
Result next = next();
|
||||
if (next != null) {
|
||||
resultSets.add(next);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return resultSets.toArray(new Result[resultSets.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Result> iterator() {
|
||||
return new Iterator<Result>() {
|
||||
// The next RowResult, possibly pre-read
|
||||
Result next = null;
|
||||
|
||||
// return true if there is another item pending, false if there isn't.
|
||||
// this method is where the actual advancing takes place, but you need
|
||||
// to call next() to consume it. hasNext() will only advance if there
|
||||
// isn't a pending next().
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (next == null) {
|
||||
try {
|
||||
next = AbstractClientScanner.this.next();
|
||||
return next != null;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// get the pending next item and advance the iterator. returns null if
|
||||
// there is no next item.
|
||||
@Override
|
||||
public Result next() {
|
||||
// since hasNext() does the real advancing, we call this to determine
|
||||
// if there is a next before proceeding.
|
||||
if (!hasNext()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// if we get to here, then hasNext() has given us an item to return.
|
||||
// we want to return the item and then null out the next pointer, so
|
||||
// we use a temporary variable.
|
||||
Result temp = next;
|
||||
next = null;
|
||||
return temp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,6 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -39,29 +37,7 @@ class AllowPartialScanResultCache implements ScanResultCache {
|
|||
private Cell lastCell;
|
||||
|
||||
private Result filterCells(Result result) {
|
||||
if (lastCell == null) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// not the same row
|
||||
if (!CellUtil.matchingRow(lastCell, result.getRow(), 0, result.getRow().length)) {
|
||||
return result;
|
||||
}
|
||||
Cell[] rawCells = result.rawCells();
|
||||
int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow);
|
||||
if (index < 0) {
|
||||
index = -index - 1;
|
||||
} else {
|
||||
index++;
|
||||
}
|
||||
if (index == 0) {
|
||||
return result;
|
||||
}
|
||||
if (index == rawCells.length) {
|
||||
return null;
|
||||
}
|
||||
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
|
||||
result.isStale(), true);
|
||||
return lastCell == null ? result : ConnectionUtils.filterCells(result, lastCell);
|
||||
}
|
||||
|
||||
private void updateLastCell(Result result) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -48,6 +49,18 @@ public interface AsyncConnection extends Closeable {
|
|||
*/
|
||||
AsyncTableRegionLocator getRegionLocator(TableName tableName);
|
||||
|
||||
/**
|
||||
* Retrieve an RawAsyncTable implementation for accessing a table. The returned Table is not
|
||||
* thread safe, a new instance should be created for each using thread. This is a lightweight
|
||||
* operation, pooling or caching of the returned AsyncTable is neither required nor desired.
|
||||
* <p>
|
||||
* This method no longer checks table existence. An exception will be thrown if the table does not
|
||||
* exist only when the first operation is attempted.
|
||||
* @param tableName the name of the table
|
||||
* @return an RawAsyncTable to use for interactions with this table
|
||||
*/
|
||||
RawAsyncTable getRawTable(TableName tableName);
|
||||
|
||||
/**
|
||||
* Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread
|
||||
* safe, a new instance should be created for each using thread. This is a lightweight operation,
|
||||
|
@ -56,7 +69,8 @@ public interface AsyncConnection extends Closeable {
|
|||
* This method no longer checks table existence. An exception will be thrown if the table does not
|
||||
* exist only when the first operation is attempted.
|
||||
* @param tableName the name of the table
|
||||
* @param pool the thread pool to use for executing callback
|
||||
* @return an AsyncTable to use for interactions with this table
|
||||
*/
|
||||
AsyncTable getTable(TableName tableName);
|
||||
AsyncTable getTable(TableName tableName, ExecutorService pool);
|
||||
}
|
|
@ -30,6 +30,7 @@ import java.io.IOException;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -148,7 +149,12 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AsyncTable getTable(TableName tableName) {
|
||||
return new AsyncTableImpl(this, tableName);
|
||||
public RawAsyncTable getRawTable(TableName tableName) {
|
||||
return new RawAsyncTableImpl(this, tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncTable getTable(TableName tableName, ExecutorService pool) {
|
||||
return new AsyncTableImpl(this, tableName, pool);
|
||||
}
|
||||
}
|
|
@ -262,7 +262,7 @@ class AsyncRegionLocator {
|
|||
}
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
byte[] metaKey = createRegionName(tableName, row, NINES, false);
|
||||
conn.getTable(META_TABLE_NAME)
|
||||
conn.getRawTable(META_TABLE_NAME)
|
||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||
.whenComplete(
|
||||
(results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> {
|
||||
|
@ -327,7 +327,7 @@ class AsyncRegionLocator {
|
|||
metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false);
|
||||
}
|
||||
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
|
||||
conn.getTable(META_TABLE_NAME)
|
||||
conn.getRawTable(META_TABLE_NAME)
|
||||
.smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1)
|
||||
.whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion,
|
||||
results, error, "startRowOfCurrentRegion", loc -> {
|
||||
|
|
|
@ -17,359 +17,17 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}.
|
||||
* The asynchronous table for normal users.
|
||||
* <p>
|
||||
* The implementation is NOT required to be thread safe. Do NOT access it from multiple threads
|
||||
* concurrently.
|
||||
* <p>
|
||||
* Usually the implementations will not throw any exception directly, you need to get the exception
|
||||
* from the returned {@link CompletableFuture}.
|
||||
* The implementation should make sure that user can do everything they want to the returned
|
||||
* {@code CompletableFuture} without break anything. Usually the implementation will require user to
|
||||
* provide a {@code ExecutorService}.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public interface AsyncTable {
|
||||
|
||||
/**
|
||||
* Gets the fully qualified table name instance of this table.
|
||||
*/
|
||||
TableName getName();
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
|
||||
* <p>
|
||||
* The reference returned is not a copy, so any change made to it will affect this instance.
|
||||
*/
|
||||
Configuration getConfiguration();
|
||||
|
||||
/**
|
||||
* Set timeout of each rpc read request in operations of this Table instance, will override the
|
||||
* value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too
|
||||
* long, it will stop waiting and send a new request to retry until retries exhausted or operation
|
||||
* timeout reached.
|
||||
*/
|
||||
void setReadRpcTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get timeout of each rpc read request in this Table instance.
|
||||
*/
|
||||
long getReadRpcTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set timeout of each rpc write request in operations of this Table instance, will override the
|
||||
* value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
|
||||
* long, it will stop waiting and send a new request to retry until retries exhausted or operation
|
||||
* timeout reached.
|
||||
*/
|
||||
void setWriteRpcTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get timeout of each rpc write request in this Table instance.
|
||||
*/
|
||||
long getWriteRpcTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set timeout of each operation in this Table instance, will override the value of
|
||||
* {@code hbase.client.operation.timeout} in configuration.
|
||||
* <p>
|
||||
* Operation timeout is a top-level restriction that makes sure an operation will not be blocked
|
||||
* more than this. In each operation, if rpc request fails because of timeout or other reason, it
|
||||
* will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
|
||||
* reach the operation timeout before retries exhausted, it will break early and throw
|
||||
* SocketTimeoutException.
|
||||
*/
|
||||
void setOperationTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get timeout of each operation in Table instance.
|
||||
*/
|
||||
long getOperationTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set timeout of a single operation in a scan, such as openScanner and next. Will override the
|
||||
* value {@code hbase.client.scanner.timeout.period} in configuration.
|
||||
* <p>
|
||||
* Generally a scan will never timeout after we add heartbeat support unless the region is
|
||||
* crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
|
||||
* operation in a scan.
|
||||
*/
|
||||
void setScanTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get the timeout of a single operation in a scan.
|
||||
*/
|
||||
long getScanTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Test for the existence of columns in the table, as specified by the Get.
|
||||
* <p>
|
||||
* This will return true if the Get matches one or more keys, false if not.
|
||||
* <p>
|
||||
* This is a server-side call so it prevents any data from being transfered to the client.
|
||||
* @return true if the specified Get matches one or more keys, false if not. The return value will
|
||||
* be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> exists(Get get) {
|
||||
if (!get.isCheckExistenceOnly()) {
|
||||
get = ReflectionUtils.newInstance(get.getClass(), get);
|
||||
get.setCheckExistenceOnly(true);
|
||||
}
|
||||
return get(get).thenApply(r -> r.getExists());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts certain cells from a given row.
|
||||
* @param get The object that specifies what data to fetch and from which row.
|
||||
* @return The data coming from the specified row, if it exists. If the row specified doesn't
|
||||
* exist, the {@link Result} instance returned won't contain any
|
||||
* {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
|
||||
* return value will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Result> get(Get get);
|
||||
|
||||
/**
|
||||
* Puts some data to the table.
|
||||
* @param put The data to put.
|
||||
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||
*/
|
||||
CompletableFuture<Void> put(Put put);
|
||||
|
||||
/**
|
||||
* Deletes the specified cells/row.
|
||||
* @param delete The object that specifies what to delete.
|
||||
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||
*/
|
||||
CompletableFuture<Void> delete(Delete delete);
|
||||
|
||||
/**
|
||||
* Appends values to one or more columns within a single row.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Appends are done under a single row lock, so
|
||||
* write operations to a row are synchronized, but readers do not take row locks so get and scan
|
||||
* operations can see this operation partially completed.
|
||||
* @param append object that specifies the columns and amounts to be used for the increment
|
||||
* operations
|
||||
* @return values of columns after the append operation (maybe null). The return value will be
|
||||
* wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Result> append(Append append);
|
||||
|
||||
/**
|
||||
* Increments one or more columns within a single row.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Increments are done under a single row lock,
|
||||
* so write operations to a row are synchronized, but readers do not take row locks so get and
|
||||
* scan operations can see this operation partially completed.
|
||||
* @param increment object that specifies the columns and amounts to be used for the increment
|
||||
* operations
|
||||
* @return values of columns after the increment. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Result> increment(Increment increment);
|
||||
|
||||
/**
|
||||
* See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
|
||||
* <p>
|
||||
* The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
|
||||
* @param row The row that contains the cell to increment.
|
||||
* @param family The column family of the cell to increment.
|
||||
* @param qualifier The column qualifier of the cell to increment.
|
||||
* @param amount The amount to increment the cell with (or decrement, if the amount is negative).
|
||||
* @return The new value, post increment. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount) {
|
||||
return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically increments a column value. If the column value already exists and is not a
|
||||
* big-endian long, this could throw an exception. If the column value does not yet exist it is
|
||||
* initialized to <code>amount</code> and written to the specified column.
|
||||
* <p>
|
||||
* Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
|
||||
* any increments that have not been flushed.
|
||||
* @param row The row that contains the cell to increment.
|
||||
* @param family The column family of the cell to increment.
|
||||
* @param qualifier The column qualifier of the cell to increment.
|
||||
* @param amount The amount to increment the cell with (or decrement, if the amount is negative).
|
||||
* @param durability The persistence guarantee for this increment.
|
||||
* @return The new value, post increment. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount, Durability durability) {
|
||||
Preconditions.checkNotNull(row, "row is null");
|
||||
Preconditions.checkNotNull(family, "family is null");
|
||||
Preconditions.checkNotNull(qualifier, "qualifier is null");
|
||||
return increment(
|
||||
new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
|
||||
.thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) {
|
||||
return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Put put);
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) {
|
||||
return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Delete delete);
|
||||
|
||||
/**
|
||||
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
|
||||
* {@link Delete} are supported.
|
||||
* @param mutation object that specifies the set of mutations to perform atomically
|
||||
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||
*/
|
||||
CompletableFuture<Void> mutateRow(RowMutations mutation);
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* performs the row mutations. If the passed value is null, the check is for the lack of column
|
||||
* (ie: non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, RowMutations mutation) {
|
||||
return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* performs the row mutations. If the passed value is null, the check is for the lack of column
|
||||
* (ie: non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp the comparison operator
|
||||
* @param value the expected value
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, RowMutations mutation);
|
||||
|
||||
/**
|
||||
* Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
|
||||
* @see #smallScan(Scan, int)
|
||||
*/
|
||||
default CompletableFuture<List<Result>> smallScan(Scan scan) {
|
||||
return smallScan(scan, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all the results that match the given scan object. The number of the returned results
|
||||
* will not be greater than {@code limit}.
|
||||
* <p>
|
||||
* Notice that the scan must be small, and should not use batch or allowPartialResults. The
|
||||
* {@code caching} property of the scan object is also ignored as we will use {@code limit}
|
||||
* instead.
|
||||
* @param scan A configured {@link Scan} object.
|
||||
* @param limit the limit of results count
|
||||
* @return The results of this small scan operation. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
|
||||
|
||||
/**
|
||||
* The basic scan API uses the observer pattern. All results that match the given scan object will
|
||||
* be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}.
|
||||
* {@link ScanResultConsumer#onComplete()} means the scan is finished, and
|
||||
* {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
|
||||
* is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we
|
||||
* can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually
|
||||
* because the matched results are too sparse, for example, a filter which almost filters out
|
||||
* everything is specified.
|
||||
* <p>
|
||||
* Notice that, the methods of the given {@code consumer} will be called directly in the rpc
|
||||
* framework's callback thread, so typically you should not do any time consuming work inside
|
||||
* these methods, otherwise you will be likely to block at least one connection to RS(even more if
|
||||
* the rpc framework uses NIO).
|
||||
* <p>
|
||||
* This method is only for experts, do <strong>NOT</strong> use this method if you have other
|
||||
* choice.
|
||||
* @param scan A configured {@link Scan} object.
|
||||
* @param consumer the consumer used to receive results.
|
||||
*/
|
||||
void scan(Scan scan, ScanResultConsumer consumer);
|
||||
public interface AsyncTable extends AsyncTableBase {
|
||||
}
|
||||
|
|
|
@ -0,0 +1,354 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* The base interface for asynchronous version of Table. Obtain an instance from a
|
||||
* {@link AsyncConnection}.
|
||||
* <p>
|
||||
* The implementation is NOT required to be thread safe. Do NOT access it from multiple threads
|
||||
* concurrently.
|
||||
* <p>
|
||||
* Usually the implementations will not throw any exception directly, you need to get the exception
|
||||
* from the returned {@link CompletableFuture}.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public interface AsyncTableBase {
|
||||
|
||||
/**
|
||||
* Gets the fully qualified table name instance of this table.
|
||||
*/
|
||||
TableName getName();
|
||||
|
||||
/**
|
||||
* Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
|
||||
* <p>
|
||||
* The reference returned is not a copy, so any change made to it will affect this instance.
|
||||
*/
|
||||
Configuration getConfiguration();
|
||||
|
||||
/**
|
||||
* Set timeout of each rpc read request in operations of this Table instance, will override the
|
||||
* value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too
|
||||
* long, it will stop waiting and send a new request to retry until retries exhausted or operation
|
||||
* timeout reached.
|
||||
*/
|
||||
void setReadRpcTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get timeout of each rpc read request in this Table instance.
|
||||
*/
|
||||
long getReadRpcTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set timeout of each rpc write request in operations of this Table instance, will override the
|
||||
* value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
|
||||
* long, it will stop waiting and send a new request to retry until retries exhausted or operation
|
||||
* timeout reached.
|
||||
*/
|
||||
void setWriteRpcTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get timeout of each rpc write request in this Table instance.
|
||||
*/
|
||||
long getWriteRpcTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set timeout of each operation in this Table instance, will override the value of
|
||||
* {@code hbase.client.operation.timeout} in configuration.
|
||||
* <p>
|
||||
* Operation timeout is a top-level restriction that makes sure an operation will not be blocked
|
||||
* more than this. In each operation, if rpc request fails because of timeout or other reason, it
|
||||
* will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
|
||||
* reach the operation timeout before retries exhausted, it will break early and throw
|
||||
* SocketTimeoutException.
|
||||
*/
|
||||
void setOperationTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get timeout of each operation in Table instance.
|
||||
*/
|
||||
long getOperationTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set timeout of a single operation in a scan, such as openScanner and next. Will override the
|
||||
* value {@code hbase.client.scanner.timeout.period} in configuration.
|
||||
* <p>
|
||||
* Generally a scan will never timeout after we add heartbeat support unless the region is
|
||||
* crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
|
||||
* operation in a scan.
|
||||
*/
|
||||
void setScanTimeout(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Get the timeout of a single operation in a scan.
|
||||
*/
|
||||
long getScanTimeout(TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Test for the existence of columns in the table, as specified by the Get.
|
||||
* <p>
|
||||
* This will return true if the Get matches one or more keys, false if not.
|
||||
* <p>
|
||||
* This is a server-side call so it prevents any data from being transfered to the client.
|
||||
* @return true if the specified Get matches one or more keys, false if not. The return value will
|
||||
* be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> exists(Get get) {
|
||||
if (!get.isCheckExistenceOnly()) {
|
||||
get = ReflectionUtils.newInstance(get.getClass(), get);
|
||||
get.setCheckExistenceOnly(true);
|
||||
}
|
||||
return get(get).thenApply(r -> r.getExists());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts certain cells from a given row.
|
||||
* @param get The object that specifies what data to fetch and from which row.
|
||||
* @return The data coming from the specified row, if it exists. If the row specified doesn't
|
||||
* exist, the {@link Result} instance returned won't contain any
|
||||
* {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
|
||||
* return value will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Result> get(Get get);
|
||||
|
||||
/**
|
||||
* Puts some data to the table.
|
||||
* @param put The data to put.
|
||||
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||
*/
|
||||
CompletableFuture<Void> put(Put put);
|
||||
|
||||
/**
|
||||
* Deletes the specified cells/row.
|
||||
* @param delete The object that specifies what to delete.
|
||||
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||
*/
|
||||
CompletableFuture<Void> delete(Delete delete);
|
||||
|
||||
/**
|
||||
* Appends values to one or more columns within a single row.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Appends are done under a single row lock, so
|
||||
* write operations to a row are synchronized, but readers do not take row locks so get and scan
|
||||
* operations can see this operation partially completed.
|
||||
* @param append object that specifies the columns and amounts to be used for the increment
|
||||
* operations
|
||||
* @return values of columns after the append operation (maybe null). The return value will be
|
||||
* wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Result> append(Append append);
|
||||
|
||||
/**
|
||||
* Increments one or more columns within a single row.
|
||||
* <p>
|
||||
* This operation does not appear atomic to readers. Increments are done under a single row lock,
|
||||
* so write operations to a row are synchronized, but readers do not take row locks so get and
|
||||
* scan operations can see this operation partially completed.
|
||||
* @param increment object that specifies the columns and amounts to be used for the increment
|
||||
* operations
|
||||
* @return values of columns after the increment. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Result> increment(Increment increment);
|
||||
|
||||
/**
|
||||
* See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
|
||||
* <p>
|
||||
* The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
|
||||
* @param row The row that contains the cell to increment.
|
||||
* @param family The column family of the cell to increment.
|
||||
* @param qualifier The column qualifier of the cell to increment.
|
||||
* @param amount The amount to increment the cell with (or decrement, if the amount is negative).
|
||||
* @return The new value, post increment. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount) {
|
||||
return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically increments a column value. If the column value already exists and is not a
|
||||
* big-endian long, this could throw an exception. If the column value does not yet exist it is
|
||||
* initialized to <code>amount</code> and written to the specified column.
|
||||
* <p>
|
||||
* Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
|
||||
* any increments that have not been flushed.
|
||||
* @param row The row that contains the cell to increment.
|
||||
* @param family The column family of the cell to increment.
|
||||
* @param qualifier The column qualifier of the cell to increment.
|
||||
* @param amount The amount to increment the cell with (or decrement, if the amount is negative).
|
||||
* @param durability The persistence guarantee for this increment.
|
||||
* @return The new value, post increment. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount, Durability durability) {
|
||||
Preconditions.checkNotNull(row, "row is null");
|
||||
Preconditions.checkNotNull(family, "family is null");
|
||||
Preconditions.checkNotNull(qualifier, "qualifier is null");
|
||||
return increment(
|
||||
new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
|
||||
.thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) {
|
||||
return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Put put);
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) {
|
||||
return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Delete delete);
|
||||
|
||||
/**
|
||||
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
|
||||
* {@link Delete} are supported.
|
||||
* @param mutation object that specifies the set of mutations to perform atomically
|
||||
* @return A {@link CompletableFuture} that always returns null when complete normally.
|
||||
*/
|
||||
CompletableFuture<Void> mutateRow(RowMutations mutation);
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* performs the row mutations. If the passed value is null, the check is for the lack of column
|
||||
* (ie: non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, RowMutations mutation) {
|
||||
return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* performs the row mutations. If the passed value is null, the check is for the lack of column
|
||||
* (ie: non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp the comparison operator
|
||||
* @param value the expected value
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, RowMutations mutation);
|
||||
|
||||
/**
|
||||
* Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
|
||||
* @see #smallScan(Scan, int)
|
||||
*/
|
||||
default CompletableFuture<List<Result>> smallScan(Scan scan) {
|
||||
return smallScan(scan, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all the results that match the given scan object. The number of the returned results
|
||||
* will not be greater than {@code limit}.
|
||||
* <p>
|
||||
* Notice that the scan must be small, and should not use batch or allowPartialResults. The
|
||||
* {@code caching} property of the scan object is also ignored as we will use {@code limit}
|
||||
* instead.
|
||||
* @param scan A configured {@link Scan} object.
|
||||
* @param limit the limit of results count
|
||||
* @return The results of this small scan operation. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
|
||||
}
|
|
@ -17,392 +17,143 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* The implementation of AsyncTable.
|
||||
* The implementation of AsyncTable. Based on {@link RawAsyncTable}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class AsyncTableImpl implements AsyncTable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class);
|
||||
private final RawAsyncTable rawTable;
|
||||
|
||||
private final AsyncConnectionImpl conn;
|
||||
private final ExecutorService pool;
|
||||
|
||||
private final TableName tableName;
|
||||
|
||||
private final int defaultScannerCaching;
|
||||
|
||||
private final long defaultScannerMaxResultSize;
|
||||
|
||||
private long readRpcTimeoutNs;
|
||||
|
||||
private long writeRpcTimeoutNs;
|
||||
|
||||
private long operationTimeoutNs;
|
||||
|
||||
private long scanTimeoutNs;
|
||||
|
||||
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
|
||||
this.conn = conn;
|
||||
this.tableName = tableName;
|
||||
this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
|
||||
this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
|
||||
this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
|
||||
: conn.connConf.getOperationTimeoutNs();
|
||||
this.defaultScannerCaching = conn.connConf.getScannerCaching();
|
||||
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
|
||||
this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
|
||||
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) {
|
||||
this.rawTable = conn.getRawTable(tableName);
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getName() {
|
||||
return tableName;
|
||||
return rawTable.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conn.getConfiguration();
|
||||
return rawTable.getConfiguration();
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface Converter<D, I, S> {
|
||||
D convert(I info, S src) throws IOException;
|
||||
@Override
|
||||
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
|
||||
rawTable.setReadRpcTimeout(timeout, unit);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface RpcCall<RESP, REQ> {
|
||||
void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
|
||||
RpcCallback<RESP> done);
|
||||
@Override
|
||||
public long getReadRpcTimeout(TimeUnit unit) {
|
||||
return rawTable.getReadRpcTimeout(unit);
|
||||
}
|
||||
|
||||
private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
|
||||
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
|
||||
Converter<RESP, HBaseRpcController, PRESP> respConverter) {
|
||||
CompletableFuture<RESP> future = new CompletableFuture<>();
|
||||
try {
|
||||
rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
|
||||
new RpcCallback<PRESP>() {
|
||||
|
||||
@Override
|
||||
public void run(PRESP resp) {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
try {
|
||||
future.complete(respConverter.convert(controller, resp));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
return future;
|
||||
@Override
|
||||
public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
|
||||
rawTable.setWriteRpcTimeout(timeout, unit);
|
||||
}
|
||||
|
||||
private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
Converter<MutateRequest, byte[], REQ> reqConvert,
|
||||
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
|
||||
return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
|
||||
respConverter);
|
||||
@Override
|
||||
public long getWriteRpcTimeout(TimeUnit unit) {
|
||||
return rawTable.getWriteRpcTimeout(unit);
|
||||
}
|
||||
|
||||
private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
Converter<MutateRequest, byte[], REQ> reqConvert) {
|
||||
return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
|
||||
return null;
|
||||
});
|
||||
@Override
|
||||
public void setOperationTimeout(long timeout, TimeUnit unit) {
|
||||
rawTable.setOperationTimeout(timeout, unit);
|
||||
}
|
||||
|
||||
private static Result toResult(HBaseRpcController controller, MutateResponse resp)
|
||||
throws IOException {
|
||||
if (!resp.hasResult()) {
|
||||
return null;
|
||||
}
|
||||
return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
|
||||
@Override
|
||||
public long getOperationTimeout(TimeUnit unit) {
|
||||
return rawTable.getOperationTimeout(unit);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface NoncedConverter<D, I, S> {
|
||||
D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
|
||||
@Override
|
||||
public void setScanTimeout(long timeout, TimeUnit unit) {
|
||||
rawTable.setScanTimeout(timeout, unit);
|
||||
}
|
||||
|
||||
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
|
||||
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
return mutate(controller, loc, stub, req,
|
||||
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
|
||||
@Override
|
||||
public long getScanTimeout(TimeUnit unit) {
|
||||
return rawTable.getScanTimeout(unit);
|
||||
}
|
||||
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
|
||||
return conn.callerFactory.<T> single().table(tableName).row(row)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
|
||||
return newCaller(row.getRow(), rpcTimeoutNs);
|
||||
private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
|
||||
CompletableFuture<T> asyncFuture = new CompletableFuture<>();
|
||||
future.whenCompleteAsync((r, e) -> {
|
||||
if (e != null) {
|
||||
asyncFuture.completeExceptionally(e);
|
||||
} else {
|
||||
asyncFuture.complete(r);
|
||||
}
|
||||
}, pool);
|
||||
return asyncFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> get(Get get) {
|
||||
return this.<Result> newCaller(get, readRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> AsyncTableImpl
|
||||
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
|
||||
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
|
||||
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
|
||||
.call();
|
||||
return wrap(rawTable.get(get));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> put(Put put) {
|
||||
return this
|
||||
.<Void> newCaller(put, writeRpcTimeoutNs).action((controller, loc, stub) -> AsyncTableImpl
|
||||
.<Put> voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest))
|
||||
.call();
|
||||
return wrap(rawTable.put(put));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> delete(Delete delete) {
|
||||
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> AsyncTableImpl.<Delete> voidMutate(controller, loc, stub,
|
||||
delete, RequestConverter::buildMutateRequest))
|
||||
.call();
|
||||
return wrap(rawTable.delete(delete));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> append(Append append) {
|
||||
checkHasFamilies(append);
|
||||
return this.<Result> newCaller(append, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
|
||||
append, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult))
|
||||
.call();
|
||||
return wrap(rawTable.append(append));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> increment(Increment increment) {
|
||||
checkHasFamilies(increment);
|
||||
return this.<Result> newCaller(increment, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
|
||||
stub, increment, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult))
|
||||
.call();
|
||||
return wrap(rawTable.increment(increment));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Put put) {
|
||||
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> AsyncTableImpl.<Put, Boolean> mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
return wrap(rawTable.checkAndPut(row, family, qualifier, compareOp, value, put));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Delete delete) {
|
||||
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> AsyncTableImpl.<Delete, Boolean> mutate(controller, loc,
|
||||
stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
|
||||
// so here I write a new method as I do not want to change the abstraction of call method.
|
||||
private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
|
||||
Converter<MultiRequest, byte[], RowMutations> reqConvert,
|
||||
Function<Result, RESP> respConverter) {
|
||||
CompletableFuture<RESP> future = new CompletableFuture<>();
|
||||
try {
|
||||
byte[] regionName = loc.getRegionInfo().getRegionName();
|
||||
MultiRequest req = reqConvert.convert(regionName, mutation);
|
||||
stub.multi(controller, req, new RpcCallback<MultiResponse>() {
|
||||
|
||||
@Override
|
||||
public void run(MultiResponse resp) {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
try {
|
||||
org.apache.hadoop.hbase.client.MultiResponse multiResp =
|
||||
ResponseConverter.getResults(req, resp, controller.cellScanner());
|
||||
Throwable ex = multiResp.getException(regionName);
|
||||
if (ex != null) {
|
||||
future
|
||||
.completeExceptionally(ex instanceof IOException ? ex
|
||||
: new IOException(
|
||||
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
|
||||
ex));
|
||||
} else {
|
||||
future.complete(respConverter
|
||||
.apply((Result) multiResp.getResults().get(regionName).result.get(0)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
return future;
|
||||
return wrap(rawTable.checkAndDelete(row, family, qualifier, compareOp, value, delete));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
|
||||
return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
|
||||
stub) -> AsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
|
||||
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
|
||||
regionMutationBuilder.setAtomic(true);
|
||||
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
|
||||
}, (resp) -> {
|
||||
return null;
|
||||
})).call();
|
||||
return wrap(rawTable.mutateRow(mutation));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, RowMutations mutation) {
|
||||
return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> AsyncTableImpl.<Boolean> mutateRow(controller, loc, stub,
|
||||
mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
|
||||
(resp) -> resp.getExists()))
|
||||
.call();
|
||||
}
|
||||
|
||||
private <T> CompletableFuture<T> failedFuture(Throwable error) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
future.completeExceptionally(error);
|
||||
return future;
|
||||
}
|
||||
|
||||
private Scan setDefaultScanConfig(Scan scan) {
|
||||
// always create a new scan object as we may reset the start row later.
|
||||
Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
|
||||
if (newScan.getCaching() <= 0) {
|
||||
newScan.setCaching(defaultScannerCaching);
|
||||
}
|
||||
if (newScan.getMaxResultSize() <= 0) {
|
||||
newScan.setMaxResultSize(defaultScannerMaxResultSize);
|
||||
}
|
||||
return newScan;
|
||||
return wrap(rawTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
|
||||
if (!scan.isSmall()) {
|
||||
return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
|
||||
}
|
||||
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
|
||||
return failedFuture(
|
||||
new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
|
||||
}
|
||||
return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
|
||||
.limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
|
||||
return wrap(rawTable.smallScan(scan, limit));
|
||||
}
|
||||
|
||||
public void scan(Scan scan, ScanResultConsumer consumer) {
|
||||
if (scan.isSmall()) {
|
||||
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
|
||||
consumer.onError(
|
||||
new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
|
||||
} else {
|
||||
LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
|
||||
}
|
||||
}
|
||||
scan = setDefaultScanConfig(scan);
|
||||
new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
|
||||
.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
|
||||
this.readRpcTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRpcTimeout(TimeUnit unit) {
|
||||
return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
|
||||
this.writeRpcTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRpcTimeout(TimeUnit unit) {
|
||||
return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOperationTimeout(long timeout, TimeUnit unit) {
|
||||
this.operationTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOperationTimeout(TimeUnit unit) {
|
||||
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScanTimeout(long timeout, TimeUnit unit) {
|
||||
this.scanTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getScanTimeout(TimeUnit unit) {
|
||||
return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Queue;
|
||||
|
@ -31,6 +27,12 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
/**
|
||||
* ClientAsyncPrefetchScanner implements async scanner behaviour.
|
||||
* Specifically, the cache used by this scanner is a concurrent queue which allows both
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
|
||||
|
||||
|
@ -36,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -554,15 +554,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
|
||||
}
|
||||
|
||||
protected long calcEstimatedSize(Result rs) {
|
||||
long estimatedHeapSizeOfResult = 0;
|
||||
// We don't make Iterator here
|
||||
for (Cell cell : rs.rawCells()) {
|
||||
estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell);
|
||||
}
|
||||
return estimatedHeapSizeOfResult;
|
||||
}
|
||||
|
||||
protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -35,6 +35,9 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -290,4 +293,35 @@ public final class ConnectionUtils {
|
|||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
static long calcEstimatedSize(Result rs) {
|
||||
long estimatedHeapSizeOfResult = 0;
|
||||
// We don't make Iterator here
|
||||
for (Cell cell : rs.rawCells()) {
|
||||
estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell);
|
||||
}
|
||||
return estimatedHeapSizeOfResult;
|
||||
}
|
||||
|
||||
static Result filterCells(Result result, Cell keepCellsAfter) {
|
||||
// not the same row
|
||||
if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) {
|
||||
return result;
|
||||
}
|
||||
Cell[] rawCells = result.rawCells();
|
||||
int index = Arrays.binarySearch(rawCells, keepCellsAfter, CellComparator::compareWithoutRow);
|
||||
if (index < 0) {
|
||||
index = -index - 1;
|
||||
} else {
|
||||
index++;
|
||||
}
|
||||
if (index == 0) {
|
||||
return result;
|
||||
}
|
||||
if (index == rawCells.length) {
|
||||
return null;
|
||||
}
|
||||
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
|
||||
result.isStale(), true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* A low level asynchronous table.
|
||||
* <p>
|
||||
* The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback
|
||||
* thread, so typically you should not do any time consuming work inside these methods, otherwise
|
||||
* you will be likely to block at least one connection to RS(even more if the rpc framework uses
|
||||
* NIO).
|
||||
* <p>
|
||||
* So, only experts that want to build high performance service should use this interface directly,
|
||||
* especially for the {@link #scan(Scan, ScanResultConsumer)} below.
|
||||
* <p>
|
||||
* TODO: For now the only difference between this interface and {@link AsyncTable} is the scan
|
||||
* method. The {@link ScanResultConsumer} exposes the implementation details of a scan(heartbeat) so
|
||||
* it is not suitable for a normal user. If it is still the only difference after we implement most
|
||||
* features of AsyncTable, we can think about merge these two interfaces.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public interface RawAsyncTable extends AsyncTableBase {
|
||||
|
||||
/**
|
||||
* The basic scan API uses the observer pattern. All results that match the given scan object will
|
||||
* be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}.
|
||||
* {@link ScanResultConsumer#onComplete()} means the scan is finished, and
|
||||
* {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
|
||||
* is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we
|
||||
* can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually
|
||||
* because the matched results are too sparse, for example, a filter which almost filters out
|
||||
* everything is specified.
|
||||
* <p>
|
||||
* Notice that, the methods of the given {@code consumer} will be called directly in the rpc
|
||||
* framework's callback thread, so typically you should not do any time consuming work inside
|
||||
* these methods, otherwise you will be likely to block at least one connection to RS(even more if
|
||||
* the rpc framework uses NIO).
|
||||
* @param scan A configured {@link Scan} object.
|
||||
* @param consumer the consumer used to receive results.
|
||||
*/
|
||||
void scan(Scan scan, ScanResultConsumer consumer);
|
||||
}
|
|
@ -0,0 +1,408 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* The implementation of RawAsyncTable.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RawAsyncTableImpl implements RawAsyncTable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class);
|
||||
|
||||
private final AsyncConnectionImpl conn;
|
||||
|
||||
private final TableName tableName;
|
||||
|
||||
private final int defaultScannerCaching;
|
||||
|
||||
private final long defaultScannerMaxResultSize;
|
||||
|
||||
private long readRpcTimeoutNs;
|
||||
|
||||
private long writeRpcTimeoutNs;
|
||||
|
||||
private long operationTimeoutNs;
|
||||
|
||||
private long scanTimeoutNs;
|
||||
|
||||
public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
|
||||
this.conn = conn;
|
||||
this.tableName = tableName;
|
||||
this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
|
||||
this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
|
||||
this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
|
||||
: conn.connConf.getOperationTimeoutNs();
|
||||
this.defaultScannerCaching = conn.connConf.getScannerCaching();
|
||||
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
|
||||
this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conn.getConfiguration();
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface Converter<D, I, S> {
|
||||
D convert(I info, S src) throws IOException;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface RpcCall<RESP, REQ> {
|
||||
void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
|
||||
RpcCallback<RESP> done);
|
||||
}
|
||||
|
||||
private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
|
||||
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
|
||||
Converter<RESP, HBaseRpcController, PRESP> respConverter) {
|
||||
CompletableFuture<RESP> future = new CompletableFuture<>();
|
||||
try {
|
||||
rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
|
||||
new RpcCallback<PRESP>() {
|
||||
|
||||
@Override
|
||||
public void run(PRESP resp) {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
try {
|
||||
future.complete(respConverter.convert(controller, resp));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
Converter<MutateRequest, byte[], REQ> reqConvert,
|
||||
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
|
||||
return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
|
||||
respConverter);
|
||||
}
|
||||
|
||||
private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
Converter<MutateRequest, byte[], REQ> reqConvert) {
|
||||
return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private static Result toResult(HBaseRpcController controller, MutateResponse resp)
|
||||
throws IOException {
|
||||
if (!resp.hasResult()) {
|
||||
return null;
|
||||
}
|
||||
return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface NoncedConverter<D, I, S> {
|
||||
D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
|
||||
}
|
||||
|
||||
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, REQ req,
|
||||
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
|
||||
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
return mutate(controller, loc, stub, req,
|
||||
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
|
||||
}
|
||||
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
|
||||
return conn.callerFactory.<T> single().table(tableName).row(row)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
|
||||
return newCaller(row.getRow(), rpcTimeoutNs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> get(Get get) {
|
||||
return this.<Result> newCaller(get, readRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl
|
||||
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
|
||||
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
|
||||
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> put(Put put) {
|
||||
return this.<Void> newCaller(put, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
|
||||
put, RequestConverter::buildMutateRequest))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> delete(Delete delete) {
|
||||
return this.<Void> newCaller(delete, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
|
||||
stub, delete, RequestConverter::buildMutateRequest))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> append(Append append) {
|
||||
checkHasFamilies(append);
|
||||
return this.<Result> newCaller(append, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
|
||||
append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> increment(Increment increment) {
|
||||
checkHasFamilies(increment);
|
||||
return this.<Result> newCaller(increment, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
|
||||
stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Put put) {
|
||||
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Delete delete) {
|
||||
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
|
||||
loc, stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
|
||||
// so here I write a new method as I do not want to change the abstraction of call method.
|
||||
private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
|
||||
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
|
||||
Converter<MultiRequest, byte[], RowMutations> reqConvert,
|
||||
Function<Result, RESP> respConverter) {
|
||||
CompletableFuture<RESP> future = new CompletableFuture<>();
|
||||
try {
|
||||
byte[] regionName = loc.getRegionInfo().getRegionName();
|
||||
MultiRequest req = reqConvert.convert(regionName, mutation);
|
||||
stub.multi(controller, req, new RpcCallback<MultiResponse>() {
|
||||
|
||||
@Override
|
||||
public void run(MultiResponse resp) {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
try {
|
||||
org.apache.hadoop.hbase.client.MultiResponse multiResp =
|
||||
ResponseConverter.getResults(req, resp, controller.cellScanner());
|
||||
Throwable ex = multiResp.getException(regionName);
|
||||
if (ex != null) {
|
||||
future
|
||||
.completeExceptionally(ex instanceof IOException ? ex
|
||||
: new IOException(
|
||||
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
|
||||
ex));
|
||||
} else {
|
||||
future.complete(respConverter
|
||||
.apply((Result) multiResp.getResults().get(regionName).result.get(0)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
|
||||
return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
|
||||
stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
|
||||
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
|
||||
regionMutationBuilder.setAtomic(true);
|
||||
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
|
||||
}, (resp) -> {
|
||||
return null;
|
||||
})).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, RowMutations mutation) {
|
||||
return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
|
||||
stub, mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
|
||||
(resp) -> resp.getExists()))
|
||||
.call();
|
||||
}
|
||||
|
||||
private <T> CompletableFuture<T> failedFuture(Throwable error) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
future.completeExceptionally(error);
|
||||
return future;
|
||||
}
|
||||
|
||||
private Scan setDefaultScanConfig(Scan scan) {
|
||||
// always create a new scan object as we may reset the start row later.
|
||||
Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
|
||||
if (newScan.getCaching() <= 0) {
|
||||
newScan.setCaching(defaultScannerCaching);
|
||||
}
|
||||
if (newScan.getMaxResultSize() <= 0) {
|
||||
newScan.setMaxResultSize(defaultScannerMaxResultSize);
|
||||
}
|
||||
return newScan;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
|
||||
if (!scan.isSmall()) {
|
||||
return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
|
||||
}
|
||||
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
|
||||
return failedFuture(
|
||||
new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
|
||||
}
|
||||
return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
|
||||
.limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
|
||||
}
|
||||
|
||||
public void scan(Scan scan, ScanResultConsumer consumer) {
|
||||
if (scan.isSmall()) {
|
||||
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
|
||||
consumer.onError(
|
||||
new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
|
||||
} else {
|
||||
LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
|
||||
}
|
||||
}
|
||||
scan = setDefaultScanConfig(scan);
|
||||
new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
|
||||
.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
|
||||
this.readRpcTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRpcTimeout(TimeUnit unit) {
|
||||
return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
|
||||
this.writeRpcTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRpcTimeout(TimeUnit unit) {
|
||||
return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOperationTimeout(long timeout, TimeUnit unit) {
|
||||
this.operationTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOperationTimeout(TimeUnit unit) {
|
||||
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScanTimeout(long timeout, TimeUnit unit) {
|
||||
this.scanTimeoutNs = unit.toNanos(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getScanTimeout(TimeUnit unit) {
|
||||
return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
|
||||
}
|
||||
}
|
|
@ -20,32 +20,90 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Interface for client-side scanning.
|
||||
* Go to {@link Table} to obtain instances.
|
||||
* Interface for client-side scanning. Go to {@link Table} to obtain instances.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public interface ResultScanner extends Closeable, Iterable<Result> {
|
||||
|
||||
@Override
|
||||
default Iterator<Result> iterator() {
|
||||
return new Iterator<Result>() {
|
||||
// The next RowResult, possibly pre-read
|
||||
Result next = null;
|
||||
|
||||
// return true if there is another item pending, false if there isn't.
|
||||
// this method is where the actual advancing takes place, but you need
|
||||
// to call next() to consume it. hasNext() will only advance if there
|
||||
// isn't a pending next().
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (next != null) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
return (next = ResultScanner.this.next()) != null;
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// get the pending next item and advance the iterator. returns null if
|
||||
// there is no next item.
|
||||
@Override
|
||||
public Result next() {
|
||||
// since hasNext() does the real advancing, we call this to determine
|
||||
// if there is a next before proceeding.
|
||||
if (!hasNext()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// if we get to here, then hasNext() has given us an item to return.
|
||||
// we want to return the item and then null out the next pointer, so
|
||||
// we use a temporary variable.
|
||||
Result temp = next;
|
||||
next = null;
|
||||
return temp;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values. The scanner will return a Result.
|
||||
* @return Result object if there is another row, null if the scanner is
|
||||
* exhausted.
|
||||
* @return Result object if there is another row, null if the scanner is exhausted.
|
||||
* @throws IOException e
|
||||
*/
|
||||
Result next() throws IOException;
|
||||
|
||||
/**
|
||||
* Get nbRows rows. How many RPCs are made is determined by the {@link Scan#setCaching(int)}
|
||||
* setting (or hbase.client.scanner.caching in hbase-site.xml).
|
||||
* @param nbRows number of rows to return
|
||||
* @return Between zero and nbRows results
|
||||
* @throws IOException e
|
||||
* @return Between zero and nbRows rowResults. Scan is done if returned array is of zero-length
|
||||
* (We never return null).
|
||||
* @throws IOException
|
||||
*/
|
||||
Result [] next(int nbRows) throws IOException;
|
||||
default Result[] next(int nbRows) throws IOException {
|
||||
List<Result> resultSets = new ArrayList<>(nbRows);
|
||||
for (int i = 0; i < nbRows; i++) {
|
||||
Result next = next();
|
||||
if (next != null) {
|
||||
resultSets.add(next);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return resultSets.toArray(new Result[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the scanner and releases any resources it has allocated
|
||||
|
|
|
@ -57,11 +57,12 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
|
||||
List<CompletableFuture<?>> futures = new ArrayList<>();
|
||||
IntStream.range(0, COUNT).forEach(
|
||||
i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
|
||||
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
|
||||
IntStream.range(0, COUNT)
|
||||
.forEach(i -> futures.add(table.put(
|
||||
new Put(Bytes.toBytes(String.format("%03d", i))).addColumn(FAMILY, CQ1, Bytes.toBytes(i))
|
||||
.addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
|
||||
}
|
||||
|
||||
|
@ -73,11 +74,11 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
|
||||
protected abstract Scan createScan();
|
||||
|
||||
protected abstract List<Result> doScan(AsyncTable table, Scan scan) throws Exception;
|
||||
protected abstract List<Result> doScan(Scan scan) throws Exception;
|
||||
|
||||
@Test
|
||||
public void testScanAll() throws Exception {
|
||||
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan());
|
||||
List<Result> results = doScan(createScan());
|
||||
assertEquals(COUNT, results.size());
|
||||
IntStream.range(0, COUNT).forEach(i -> {
|
||||
Result result = results.get(i);
|
||||
|
@ -94,7 +95,7 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
|
||||
@Test
|
||||
public void testReversedScanAll() throws Exception {
|
||||
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan().setReversed(true));
|
||||
List<Result> results = doScan(createScan().setReversed(true));
|
||||
assertEquals(COUNT, results.size());
|
||||
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
|
||||
}
|
||||
|
@ -102,8 +103,8 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
@Test
|
||||
public void testScanNoStopKey() throws Exception {
|
||||
int start = 345;
|
||||
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))));
|
||||
List<Result> results =
|
||||
doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))));
|
||||
assertEquals(COUNT - start, results.size());
|
||||
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
|
||||
}
|
||||
|
@ -111,24 +112,24 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
@Test
|
||||
public void testReverseScanNoStopKey() throws Exception {
|
||||
int start = 765;
|
||||
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||
List<Result> results = doScan(
|
||||
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
|
||||
assertEquals(start + 1, results.size());
|
||||
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
|
||||
}
|
||||
|
||||
private void testScan(int start, int stop) throws Exception {
|
||||
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))));
|
||||
List<Result> results =
|
||||
doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))));
|
||||
assertEquals(stop - start, results.size());
|
||||
IntStream.range(0, stop - start).forEach(i -> assertResultEquals(results.get(i), start + i));
|
||||
}
|
||||
|
||||
private void testReversedScan(int start, int stop) throws Exception {
|
||||
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true));
|
||||
List<Result> results =
|
||||
doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true));
|
||||
assertEquals(start - stop, results.size());
|
||||
IntStream.range(0, start - stop).forEach(i -> assertResultEquals(results.get(i), start - i));
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TestAsyncGetMultiThread {
|
|||
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
|
||||
AsyncTable table = CONN.getTable(TABLE_NAME);
|
||||
RawAsyncTable table = CONN.getRawTable(TABLE_NAME);
|
||||
List<CompletableFuture<?>> futures = new ArrayList<>();
|
||||
IntStream.range(0, COUNT)
|
||||
.forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
|
||||
|
@ -105,8 +105,9 @@ public class TestAsyncGetMultiThread {
|
|||
while (!stop.get()) {
|
||||
int i = ThreadLocalRandom.current().nextInt(COUNT);
|
||||
assertEquals(i,
|
||||
Bytes.toInt(CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
|
||||
.get().getValue(FAMILY, QUALIFIER)));
|
||||
Bytes.toInt(
|
||||
CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
|
||||
.getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
|
|||
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
|
||||
TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
|
||||
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
|
||||
AsyncTable table = asyncConn.getTable(TABLE_NAME);
|
||||
RawAsyncTable table = asyncConn.getRawTable(TABLE_NAME);
|
||||
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
|
||||
|
||||
// move back
|
||||
|
@ -185,7 +185,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
|
|||
return mockedLocator;
|
||||
}
|
||||
}) {
|
||||
AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME);
|
||||
RawAsyncTable table = new RawAsyncTableImpl(mockedConn, TABLE_NAME);
|
||||
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
|
||||
assertTrue(errorTriggered.get());
|
||||
errorTriggered.set(false);
|
||||
|
|
|
@ -26,12 +26,15 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -47,7 +50,12 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncTable {
|
||||
|
||||
|
@ -68,6 +76,23 @@ public class TestAsyncTable {
|
|||
|
||||
private byte[] row;
|
||||
|
||||
@Parameter
|
||||
public Supplier<AsyncTableBase> getTable;
|
||||
|
||||
private static RawAsyncTable getRawTable() {
|
||||
return ASYNC_CONN.getRawTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
private static AsyncTable getTable() {
|
||||
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static List<Object[]> params() {
|
||||
return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
|
||||
new Supplier<?>[] { TestAsyncTable::getTable });
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
|
@ -89,7 +114,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testSimple() throws Exception {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
|
||||
assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
|
||||
Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
|
||||
|
@ -106,7 +131,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testSimpleMultiple() throws Exception {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
int count = 100;
|
||||
CountDownLatch putLatch = new CountDownLatch(count);
|
||||
IntStream.range(0, count).forEach(
|
||||
|
@ -150,7 +175,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testIncrement() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
int count = 100;
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
AtomicLong sum = new AtomicLong(0L);
|
||||
|
@ -167,7 +192,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testAppend() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
int count = 10;
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
char suffix = ':';
|
||||
|
@ -190,7 +215,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testCheckAndPut() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
AtomicInteger successCount = new AtomicInteger(0);
|
||||
AtomicInteger successIndex = new AtomicInteger(-1);
|
||||
int count = 10;
|
||||
|
@ -211,7 +236,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
int count = 10;
|
||||
CountDownLatch putLatch = new CountDownLatch(count + 1);
|
||||
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
|
||||
|
@ -223,17 +248,16 @@ public class TestAsyncTable {
|
|||
AtomicInteger successCount = new AtomicInteger(0);
|
||||
AtomicInteger successIndex = new AtomicInteger(-1);
|
||||
CountDownLatch deleteLatch = new CountDownLatch(count);
|
||||
IntStream.range(0, count)
|
||||
.forEach(i -> table
|
||||
.checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
|
||||
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
|
||||
.thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
deleteLatch.countDown();
|
||||
}));
|
||||
IntStream.range(0, count).forEach(i -> table
|
||||
.checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
|
||||
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
|
||||
.thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
deleteLatch.countDown();
|
||||
}));
|
||||
deleteLatch.await();
|
||||
assertEquals(1, successCount.get());
|
||||
Result result = table.get(new Get(row)).get();
|
||||
|
@ -248,7 +272,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
RowMutations mutation = new RowMutations(row);
|
||||
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
|
||||
table.mutateRow(mutation).get();
|
||||
|
@ -266,7 +290,7 @@ public class TestAsyncTable {
|
|||
|
||||
@Test
|
||||
public void testCheckAndMutate() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
int count = 10;
|
||||
CountDownLatch putLatch = new CountDownLatch(count + 1);
|
||||
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
|
||||
|
|
|
@ -100,7 +100,7 @@ public class TestAsyncTableNoncedRetry {
|
|||
|
||||
@Test
|
||||
public void testAppend() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
|
||||
Result result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get();
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||
result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get();
|
||||
|
@ -112,7 +112,7 @@ public class TestAsyncTableNoncedRetry {
|
|||
|
||||
@Test
|
||||
public void testIncrement() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
|
||||
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
|
||||
// the second call should have no effect as we always generate the same nonce.
|
||||
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
|
||||
|
|
|
@ -124,11 +124,11 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
|
||||
SimpleScanResultConsumer scanObserver = new SimpleScanResultConsumer();
|
||||
table.scan(scan, scanObserver);
|
||||
protected List<Result> doScan(Scan scan) throws Exception {
|
||||
SimpleScanResultConsumer scanConsumer = new SimpleScanResultConsumer();
|
||||
ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer);
|
||||
List<Result> results = new ArrayList<>();
|
||||
for (Result result; (result = scanObserver.take()) != null;) {
|
||||
for (Result result; (result = scanConsumer.take()) != null;) {
|
||||
results.add(result);
|
||||
}
|
||||
if (scan.getBatch() > 0) {
|
||||
|
|
|
@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
|
@ -28,19 +31,44 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
|
||||
|
||||
@Parameter
|
||||
public Supplier<AsyncTableBase> getTable;
|
||||
|
||||
private static RawAsyncTable getRawTable() {
|
||||
return ASYNC_CONN.getRawTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
private static AsyncTable getTable() {
|
||||
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static List<Object[]> params() {
|
||||
return Arrays.asList(new Supplier<?>[] { TestAsyncTableSmallScan::getRawTable },
|
||||
new Supplier<?>[] { TestAsyncTableSmallScan::getTable });
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithLimit() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
int start = 111;
|
||||
int stop = 888;
|
||||
int limit = 300;
|
||||
List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
|
||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
|
||||
limit).get();
|
||||
List<Result> results =
|
||||
table
|
||||
.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
|
||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
|
||||
limit)
|
||||
.get();
|
||||
assertEquals(limit, results.size());
|
||||
IntStream.range(0, limit).forEach(i -> {
|
||||
Result result = results.get(i);
|
||||
|
@ -52,7 +80,7 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
|
|||
|
||||
@Test
|
||||
public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AsyncTableBase table = getTable.get();
|
||||
int start = 888;
|
||||
int stop = 111;
|
||||
int limit = 300;
|
||||
|
@ -75,7 +103,7 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
|
||||
return table.smallScan(scan).get();
|
||||
protected List<Result> doScan(Scan scan) throws Exception {
|
||||
return getTable.get().smallScan(scan).get();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue