HBASE-16838 Implement basic scan
This commit is contained in:
parent
fa838b020d
commit
8a6d6aa239
|
@ -0,0 +1,98 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A ScanResultCache that may return partial result.
|
||||||
|
* <p>
|
||||||
|
* As we can only scan from the starting of a row when error, so here we also implement the logic
|
||||||
|
* that skips the cells that have already been returned.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class AllowPartialScanResultCache implements ScanResultCache {
|
||||||
|
|
||||||
|
// used to filter out the cells that already returned to user as we always start from the
|
||||||
|
// beginning of a row when retry.
|
||||||
|
private Cell lastCell;
|
||||||
|
|
||||||
|
private Result filterCells(Result result) {
|
||||||
|
if (lastCell == null) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// not the same row
|
||||||
|
if (!CellUtil.matchingRow(lastCell, result.getRow(), 0, result.getRow().length)) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
Cell[] rawCells = result.rawCells();
|
||||||
|
int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow);
|
||||||
|
if (index < 0) {
|
||||||
|
index = -index - 1;
|
||||||
|
} else {
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
if (index == 0) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
if (index == rawCells.length) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
|
||||||
|
result.isStale(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateLastCell(Result result) {
|
||||||
|
lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
|
||||||
|
if (results.length == 0) {
|
||||||
|
return EMPTY_RESULT_ARRAY;
|
||||||
|
}
|
||||||
|
Result first = filterCells(results[0]);
|
||||||
|
if (results.length == 1) {
|
||||||
|
if (first == null) {
|
||||||
|
// do not update last cell if we filter out all cells
|
||||||
|
return EMPTY_RESULT_ARRAY;
|
||||||
|
}
|
||||||
|
updateLastCell(results[0]);
|
||||||
|
results[0] = first;
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
updateLastCell(results[results.length - 1]);
|
||||||
|
if (first == null) {
|
||||||
|
return Arrays.copyOfRange(results, 1, results.length);
|
||||||
|
}
|
||||||
|
results[0] = first;
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
// we do not cache anything
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,151 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The asynchronous client scanner implementation.
|
||||||
|
* <p>
|
||||||
|
* Here we will call OpenScanner first and use the returned scannerId to create a
|
||||||
|
* {@link AsyncScanSingleRegionRpcRetryingCaller} to do the real scan operation. The return value of
|
||||||
|
* {@link AsyncScanSingleRegionRpcRetryingCaller} will tell us whether open a new scanner or finish
|
||||||
|
* scan.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class AsyncClientScanner {
|
||||||
|
|
||||||
|
// We will use this scan object during the whole scan operation. The
|
||||||
|
// AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
|
||||||
|
private final Scan scan;
|
||||||
|
|
||||||
|
private final ScanResultConsumer consumer;
|
||||||
|
|
||||||
|
private final TableName tableName;
|
||||||
|
|
||||||
|
private final AsyncConnectionImpl conn;
|
||||||
|
|
||||||
|
private final long scanTimeoutNs;
|
||||||
|
|
||||||
|
private final long rpcTimeoutNs;
|
||||||
|
|
||||||
|
private final ScanResultCache resultCache;
|
||||||
|
|
||||||
|
public AsyncClientScanner(Scan scan, ScanResultConsumer consumer, TableName tableName,
|
||||||
|
AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) {
|
||||||
|
if (scan.getStartRow() == null) {
|
||||||
|
scan.setStartRow(EMPTY_START_ROW);
|
||||||
|
}
|
||||||
|
if (scan.getStopRow() == null) {
|
||||||
|
scan.setStopRow(EMPTY_END_ROW);
|
||||||
|
}
|
||||||
|
this.scan = scan;
|
||||||
|
this.consumer = consumer;
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.conn = conn;
|
||||||
|
this.scanTimeoutNs = scanTimeoutNs;
|
||||||
|
this.rpcTimeoutNs = rpcTimeoutNs;
|
||||||
|
this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
|
||||||
|
? new AllowPartialScanResultCache() : new CompleteScanResultCache();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class OpenScannerResponse {
|
||||||
|
|
||||||
|
public final HRegionLocation loc;
|
||||||
|
|
||||||
|
public final ClientService.Interface stub;
|
||||||
|
|
||||||
|
public final long scannerId;
|
||||||
|
|
||||||
|
public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) {
|
||||||
|
this.loc = loc;
|
||||||
|
this.stub = stub;
|
||||||
|
this.scannerId = scannerId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
|
||||||
|
HRegionLocation loc, ClientService.Interface stub) {
|
||||||
|
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
|
||||||
|
try {
|
||||||
|
ScanRequest request =
|
||||||
|
RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, 0, false);
|
||||||
|
stub.scan(controller, request, resp -> {
|
||||||
|
if (controller.failed()) {
|
||||||
|
future.completeExceptionally(controller.getFailed());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId()));
|
||||||
|
});
|
||||||
|
} catch (IOException e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startScan(OpenScannerResponse resp) {
|
||||||
|
conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
|
||||||
|
.setScan(scan).consumer(consumer).resultCache(resultCache)
|
||||||
|
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||||
|
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start()
|
||||||
|
.whenComplete((locateToPreviousRegion, error) -> {
|
||||||
|
if (error != null) {
|
||||||
|
consumer.onError(error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (locateToPreviousRegion == null) {
|
||||||
|
consumer.onComplete();
|
||||||
|
} else {
|
||||||
|
openScanner(locateToPreviousRegion.booleanValue());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void openScanner(boolean locateToPreviousRegion) {
|
||||||
|
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
|
||||||
|
.locateToPreviousRegion(locateToPreviousRegion)
|
||||||
|
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||||
|
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call()
|
||||||
|
.whenComplete((resp, error) -> {
|
||||||
|
if (error != null) {
|
||||||
|
consumer.onError(error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
startScan(resp);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
openScanner(scan.isReversed() && isEmptyStartRow(scan.getStartRow()));
|
||||||
|
}
|
||||||
|
}
|
|
@ -52,6 +52,8 @@ class AsyncConnectionConfiguration {
|
||||||
|
|
||||||
private final long metaOperationTimeoutNs;
|
private final long metaOperationTimeoutNs;
|
||||||
|
|
||||||
|
// timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
|
||||||
|
// by this value, see scanTimeoutNs.
|
||||||
private final long operationTimeoutNs;
|
private final long operationTimeoutNs;
|
||||||
|
|
||||||
// timeout for each read rpc request
|
// timeout for each read rpc request
|
||||||
|
@ -67,6 +69,10 @@ class AsyncConnectionConfiguration {
|
||||||
/** How many retries are allowed before we start to log */
|
/** How many retries are allowed before we start to log */
|
||||||
private final int startLogErrorsCnt;
|
private final int startLogErrorsCnt;
|
||||||
|
|
||||||
|
// As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
|
||||||
|
// crash. The RS will always return something before the rpc timeout or scan timeout to tell the
|
||||||
|
// client that it is still alive. The scan timeout is used as operation timeout for every
|
||||||
|
// operations in a scan, such as openScanner or next.
|
||||||
private final long scanTimeoutNs;
|
private final long scanTimeoutNs;
|
||||||
|
|
||||||
private final int scannerCaching;
|
private final int scannerCaching;
|
||||||
|
@ -125,15 +131,15 @@ class AsyncConnectionConfiguration {
|
||||||
return startLogErrorsCnt;
|
return startLogErrorsCnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getScanTimeoutNs() {
|
long getScanTimeoutNs() {
|
||||||
return scanTimeoutNs;
|
return scanTimeoutNs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getScannerCaching() {
|
int getScannerCaching() {
|
||||||
return scannerCaching;
|
return scannerCaching;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getScannerMaxResultSize() {
|
long getScannerMaxResultSize() {
|
||||||
return scannerMaxResultSize;
|
return scannerMaxResultSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
this.user = user;
|
this.user = user;
|
||||||
this.connConf = new AsyncConnectionConfiguration(conf);
|
this.connConf = new AsyncConnectionConfiguration(conf);
|
||||||
this.locator = new AsyncRegionLocator(this);
|
this.locator = new AsyncRegionLocator(this);
|
||||||
this.registry = ClusterRegistryFactory.getRegistry(conf);
|
this.registry = AsyncRegistryFactory.getRegistry(conf);
|
||||||
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
|
this.clusterId = Optional.ofNullable(registry.getClusterId()).orElseGet(() -> {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT);
|
LOG.debug("cluster id came back null, using default " + CLUSTER_ID_DEFAULT);
|
||||||
|
|
|
@ -25,11 +25,11 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
* Get instance of configured Registry.
|
* Get instance of configured Registry.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
final class ClusterRegistryFactory {
|
final class AsyncRegistryFactory {
|
||||||
|
|
||||||
static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
|
static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
|
||||||
|
|
||||||
private ClusterRegistryFactory() {
|
private AsyncRegistryFactory() {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
|
@ -26,8 +26,10 @@ import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory to create an AsyncRpcRetryCaller.
|
* Factory to create an AsyncRpcRetryCaller.
|
||||||
|
@ -171,4 +173,88 @@ class AsyncRpcRetryingCallerFactory {
|
||||||
public SmallScanCallerBuilder smallScan() {
|
public SmallScanCallerBuilder smallScan() {
|
||||||
return new SmallScanCallerBuilder();
|
return new SmallScanCallerBuilder();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class ScanSingleRegionCallerBuilder {
|
||||||
|
|
||||||
|
private long scannerId = -1L;
|
||||||
|
|
||||||
|
private Scan scan;
|
||||||
|
|
||||||
|
private ScanResultCache resultCache;
|
||||||
|
|
||||||
|
private ScanResultConsumer consumer;
|
||||||
|
|
||||||
|
private ClientService.Interface stub;
|
||||||
|
|
||||||
|
private HRegionLocation loc;
|
||||||
|
|
||||||
|
private long scanTimeoutNs;
|
||||||
|
|
||||||
|
private long rpcTimeoutNs;
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder id(long scannerId) {
|
||||||
|
this.scannerId = scannerId;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder setScan(Scan scan) {
|
||||||
|
this.scan = scan;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder resultCache(ScanResultCache resultCache) {
|
||||||
|
this.resultCache = resultCache;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder consumer(ScanResultConsumer consumer) {
|
||||||
|
this.consumer = consumer;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder stub(ClientService.Interface stub) {
|
||||||
|
this.stub = stub;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder location(HRegionLocation loc) {
|
||||||
|
this.loc = loc;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
|
||||||
|
this.scanTimeoutNs = unit.toNanos(scanTimeout);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
|
||||||
|
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AsyncScanSingleRegionRpcRetryingCaller build() {
|
||||||
|
checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
|
||||||
|
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
|
||||||
|
checkNotNull(scan, "scan is null"), scannerId,
|
||||||
|
checkNotNull(resultCache, "resultCache is null"),
|
||||||
|
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
|
||||||
|
checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(),
|
||||||
|
conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs,
|
||||||
|
conn.connConf.getStartLogErrorsCnt());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Short cut for {@code build().start()}.
|
||||||
|
*/
|
||||||
|
public CompletableFuture<Boolean> start() {
|
||||||
|
return build().start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create retry caller for scanning a region.
|
||||||
|
*/
|
||||||
|
public ScanSingleRegionCallerBuilder scanSingleRegion() {
|
||||||
|
return new ScanSingleRegionCallerBuilder();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,351 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
|
||||||
|
|
||||||
|
import io.netty.util.HashedWheelTimer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||||
|
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||||
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retry caller for scanning a region.
|
||||||
|
* <p>
|
||||||
|
* We will modify the {@link Scan} object passed in directly. The upper layer should store the
|
||||||
|
* reference of this object and use it to open new single region scanners.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AsyncScanSingleRegionRpcRetryingCaller.class);
|
||||||
|
|
||||||
|
private final HashedWheelTimer retryTimer;
|
||||||
|
|
||||||
|
private final Scan scan;
|
||||||
|
|
||||||
|
private final long scannerId;
|
||||||
|
|
||||||
|
private final ScanResultCache resultCache;
|
||||||
|
|
||||||
|
private final ScanResultConsumer consumer;
|
||||||
|
|
||||||
|
private final ClientService.Interface stub;
|
||||||
|
|
||||||
|
private final HRegionLocation loc;
|
||||||
|
|
||||||
|
private final long pauseNs;
|
||||||
|
|
||||||
|
private final int maxAttempts;
|
||||||
|
|
||||||
|
private final long scanTimeoutNs;
|
||||||
|
|
||||||
|
private final long rpcTimeoutNs;
|
||||||
|
|
||||||
|
private final int startLogErrorsCnt;
|
||||||
|
|
||||||
|
private final Supplier<byte[]> createNextStartRowWhenError;
|
||||||
|
|
||||||
|
private final Runnable completeWhenNoMoreResultsInRegion;
|
||||||
|
|
||||||
|
private final CompletableFuture<Boolean> future;
|
||||||
|
|
||||||
|
private final HBaseRpcController controller;
|
||||||
|
|
||||||
|
private byte[] nextStartRowWhenError;
|
||||||
|
|
||||||
|
private boolean includeNextStartRowWhenError;
|
||||||
|
|
||||||
|
private long nextCallStartNs;
|
||||||
|
|
||||||
|
private int tries = 1;
|
||||||
|
|
||||||
|
private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
|
||||||
|
|
||||||
|
private long nextCallSeq = -1L;
|
||||||
|
|
||||||
|
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
|
||||||
|
AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
|
||||||
|
ScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
|
||||||
|
int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||||
|
this.retryTimer = retryTimer;
|
||||||
|
this.scan = scan;
|
||||||
|
this.scannerId = scannerId;
|
||||||
|
this.resultCache = resultCache;
|
||||||
|
this.consumer = consumer;
|
||||||
|
this.stub = stub;
|
||||||
|
this.loc = loc;
|
||||||
|
this.pauseNs = pauseNs;
|
||||||
|
this.maxAttempts = retries2Attempts(maxRetries);
|
||||||
|
this.scanTimeoutNs = scanTimeoutNs;
|
||||||
|
this.rpcTimeoutNs = rpcTimeoutNs;
|
||||||
|
this.startLogErrorsCnt = startLogErrorsCnt;
|
||||||
|
if (scan.isReversed()) {
|
||||||
|
createNextStartRowWhenError = this::createReversedNextStartRowWhenError;
|
||||||
|
completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
|
||||||
|
} else {
|
||||||
|
createNextStartRowWhenError = this::createNextStartRowWhenError;
|
||||||
|
completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
|
||||||
|
}
|
||||||
|
this.future = new CompletableFuture<>();
|
||||||
|
this.controller = conn.rpcControllerFactory.newController();
|
||||||
|
this.exceptions = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
private long elapsedMs() {
|
||||||
|
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeScanner() {
|
||||||
|
resetController(controller, rpcTimeoutNs);
|
||||||
|
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
|
||||||
|
stub.scan(controller, req, resp -> {
|
||||||
|
if (controller.failed()) {
|
||||||
|
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
|
||||||
|
+ " for " + loc.getRegionInfo().getEncodedName() + " of "
|
||||||
|
+ loc.getRegionInfo().getTable() + " failed, ignore, probably already closed",
|
||||||
|
controller.getFailed());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void completeExceptionally(boolean closeScanner) {
|
||||||
|
resultCache.clear();
|
||||||
|
if (closeScanner) {
|
||||||
|
closeScanner();
|
||||||
|
}
|
||||||
|
future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
|
||||||
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
|
||||||
|
justification = "https://github.com/findbugsproject/findbugs/issues/79")
|
||||||
|
private void completeNoMoreResults() {
|
||||||
|
future.complete(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void completeWithNextStartRow(byte[] nextStartRow) {
|
||||||
|
scan.setStartRow(nextStartRow);
|
||||||
|
future.complete(scan.isReversed());
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] createNextStartRowWhenError() {
|
||||||
|
return createClosestRowAfter(nextStartRowWhenError);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] createReversedNextStartRowWhenError() {
|
||||||
|
return createClosestRowBefore(nextStartRowWhenError);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void completeWhenError(boolean closeScanner) {
|
||||||
|
resultCache.clear();
|
||||||
|
if (closeScanner) {
|
||||||
|
closeScanner();
|
||||||
|
}
|
||||||
|
if (nextStartRowWhenError != null) {
|
||||||
|
scan.setStartRow(
|
||||||
|
includeNextStartRowWhenError ? nextStartRowWhenError : createNextStartRowWhenError.get());
|
||||||
|
}
|
||||||
|
future.complete(
|
||||||
|
scan.isReversed() && Bytes.equals(scan.getStartRow(), loc.getRegionInfo().getEndKey()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onError(Throwable error) {
|
||||||
|
error = translateException(error);
|
||||||
|
if (tries > startLogErrorsCnt) {
|
||||||
|
LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
|
||||||
|
+ loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable()
|
||||||
|
+ " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
|
||||||
|
+ TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
|
||||||
|
+ " ms",
|
||||||
|
error);
|
||||||
|
}
|
||||||
|
boolean scannerClosed =
|
||||||
|
error instanceof UnknownScannerException || error instanceof NotServingRegionException
|
||||||
|
|| error instanceof RegionServerStoppedException;
|
||||||
|
RetriesExhaustedException.ThrowableWithExtraContext qt =
|
||||||
|
new RetriesExhaustedException.ThrowableWithExtraContext(error,
|
||||||
|
EnvironmentEdgeManager.currentTime(), "");
|
||||||
|
exceptions.add(qt);
|
||||||
|
if (tries >= maxAttempts) {
|
||||||
|
completeExceptionally(!scannerClosed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
long delayNs;
|
||||||
|
if (scanTimeoutNs > 0) {
|
||||||
|
long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
|
||||||
|
if (maxDelayNs <= 0) {
|
||||||
|
completeExceptionally(!scannerClosed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
|
||||||
|
} else {
|
||||||
|
delayNs = getPauseTime(pauseNs, tries - 1);
|
||||||
|
}
|
||||||
|
if (scannerClosed) {
|
||||||
|
completeWhenError(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) {
|
||||||
|
completeWhenError(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (error instanceof DoNotRetryIOException) {
|
||||||
|
completeExceptionally(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tries++;
|
||||||
|
retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateNextStartRowWhenError(Result result) {
|
||||||
|
nextStartRowWhenError = result.getRow();
|
||||||
|
includeNextStartRowWhenError = result.isPartial();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void completeWhenNoMoreResultsInRegion() {
|
||||||
|
if (isEmptyStopRow(scan.getStopRow())) {
|
||||||
|
if (isEmptyStopRow(loc.getRegionInfo().getEndKey())) {
|
||||||
|
completeNoMoreResults();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (Bytes.compareTo(loc.getRegionInfo().getEndKey(), scan.getStopRow()) >= 0) {
|
||||||
|
completeNoMoreResults();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
completeWithNextStartRow(loc.getRegionInfo().getEndKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void completeReversedWhenNoMoreResultsInRegion() {
|
||||||
|
if (isEmptyStopRow(scan.getStopRow())) {
|
||||||
|
if (isEmptyStartRow(loc.getRegionInfo().getStartKey())) {
|
||||||
|
completeNoMoreResults();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (Bytes.compareTo(loc.getRegionInfo().getStartKey(), scan.getStopRow()) <= 0) {
|
||||||
|
completeNoMoreResults();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
completeWithNextStartRow(loc.getRegionInfo().getStartKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onComplete(ScanResponse resp) {
|
||||||
|
if (controller.failed()) {
|
||||||
|
onError(controller.getFailed());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
|
||||||
|
Result[] results;
|
||||||
|
try {
|
||||||
|
results = resultCache.addAndGet(
|
||||||
|
Optional.ofNullable(ResponseConverter.getResults(controller.cellScanner(), resp))
|
||||||
|
.orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
|
||||||
|
isHeartbeatMessage);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// We can not retry here. The server has responded normally and the call sequence has been
|
||||||
|
// increased so a new scan with the same call sequence will cause an
|
||||||
|
// OutOfOrderScannerNextException. Let the upper layer open a new scanner.
|
||||||
|
LOG.warn("decode scan response failed", e);
|
||||||
|
completeWhenError(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean stopByUser;
|
||||||
|
if (results.length == 0) {
|
||||||
|
// if we have nothing to return then this must be a heartbeat message.
|
||||||
|
stopByUser = !consumer.onHeartbeat();
|
||||||
|
} else {
|
||||||
|
updateNextStartRowWhenError(results[results.length - 1]);
|
||||||
|
stopByUser = !consumer.onNext(results);
|
||||||
|
}
|
||||||
|
if (resp.hasMoreResults() && !resp.getMoreResults()) {
|
||||||
|
// RS tells us there is no more data for the whole scan
|
||||||
|
completeNoMoreResults();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (stopByUser) {
|
||||||
|
if (resp.getMoreResultsInRegion()) {
|
||||||
|
// we have more results in region but user request to stop the scan, so we need to close the
|
||||||
|
// scanner explicitly.
|
||||||
|
closeScanner();
|
||||||
|
}
|
||||||
|
completeNoMoreResults();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// as in 2.0 this value will always be set
|
||||||
|
if (!resp.getMoreResultsInRegion()) {
|
||||||
|
completeWhenNoMoreResultsInRegion.run();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void call() {
|
||||||
|
resetController(controller, rpcTimeoutNs);
|
||||||
|
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
|
||||||
|
nextCallSeq, false, false);
|
||||||
|
stub.scan(controller, req, this::onComplete);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void next() {
|
||||||
|
nextCallSeq++;
|
||||||
|
tries = 0;
|
||||||
|
exceptions.clear();
|
||||||
|
nextCallStartNs = System.nanoTime();
|
||||||
|
call();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return return locate direction for next open scanner call, or null if we should stop.
|
||||||
|
*/
|
||||||
|
public CompletableFuture<Boolean> start() {
|
||||||
|
next();
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,14 +18,13 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
||||||
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
|
||||||
|
|
||||||
import io.netty.util.HashedWheelTimer;
|
import io.netty.util.HashedWheelTimer;
|
||||||
import io.netty.util.Timeout;
|
|
||||||
import io.netty.util.TimerTask;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
@ -40,11 +39,9 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retry caller for a single request, such as get, put, delete, etc.
|
* Retry caller for a single request, such as get, put, delete, etc.
|
||||||
|
@ -121,19 +118,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
||||||
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
|
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Throwable translateException(Throwable t) {
|
|
||||||
if (t instanceof UndeclaredThrowableException && t.getCause() != null) {
|
|
||||||
t = t.getCause();
|
|
||||||
}
|
|
||||||
if (t instanceof RemoteException) {
|
|
||||||
t = ((RemoteException) t).unwrapRemoteException();
|
|
||||||
}
|
|
||||||
if (t instanceof ServiceException && t.getCause() != null) {
|
|
||||||
t = translateException(t.getCause());
|
|
||||||
}
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void completeExceptionally() {
|
private void completeExceptionally() {
|
||||||
future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
|
future.completeExceptionally(new RetriesExhaustedException(tries, exceptions));
|
||||||
}
|
}
|
||||||
|
@ -165,22 +149,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
||||||
}
|
}
|
||||||
updateCachedLocation.accept(error);
|
updateCachedLocation.accept(error);
|
||||||
tries++;
|
tries++;
|
||||||
retryTimer.newTimeout(new TimerTask() {
|
retryTimer.newTimeout(t -> locateThenCall(), delayNs, TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(Timeout timeout) throws Exception {
|
|
||||||
// always restart from beginning.
|
|
||||||
locateThenCall();
|
|
||||||
}
|
|
||||||
}, delayNs, TimeUnit.NANOSECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void resetController() {
|
|
||||||
controller.reset();
|
|
||||||
if (rpcTimeoutNs >= 0) {
|
|
||||||
controller.setCallTimeout(
|
|
||||||
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(rpcTimeoutNs)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void call(HRegionLocation loc) {
|
private void call(HRegionLocation loc) {
|
||||||
|
@ -197,7 +166,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
||||||
err -> conn.getLocator().updateCachedLocation(loc, err));
|
err -> conn.getLocator().updateCachedLocation(loc, err));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
resetController();
|
resetController(controller, rpcTimeoutNs);
|
||||||
callable.call(controller, loc, stub).whenComplete((result, error) -> {
|
callable.call(controller, loc, stub).whenComplete((result, error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
onError(error,
|
onError(error,
|
||||||
|
|
|
@ -99,6 +99,21 @@ public interface AsyncTable {
|
||||||
*/
|
*/
|
||||||
long getOperationTimeout(TimeUnit unit);
|
long getOperationTimeout(TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set timeout of a single operation in a scan, such as openScanner and next. Will override the
|
||||||
|
* value {@code hbase.client.scanner.timeout.period} in configuration.
|
||||||
|
* <p>
|
||||||
|
* Generally a scan will never timeout after we add heartbeat support unless the region is
|
||||||
|
* crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
|
||||||
|
* operation in a scan.
|
||||||
|
*/
|
||||||
|
void setScanTimeout(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the timeout of a single operation in a scan.
|
||||||
|
*/
|
||||||
|
long getScanTimeout(TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for the existence of columns in the table, as specified by the Get.
|
* Test for the existence of columns in the table, as specified by the Get.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -335,4 +350,26 @@ public interface AsyncTable {
|
||||||
* {@link CompletableFuture}.
|
* {@link CompletableFuture}.
|
||||||
*/
|
*/
|
||||||
CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
|
CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The basic scan API uses the observer pattern. All results that match the given scan object will
|
||||||
|
* be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}.
|
||||||
|
* {@link ScanResultConsumer#onComplete()} means the scan is finished, and
|
||||||
|
* {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
|
||||||
|
* is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we
|
||||||
|
* can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually
|
||||||
|
* because the matched results are too sparse, for example, a filter which almost filters out
|
||||||
|
* everything is specified.
|
||||||
|
* <p>
|
||||||
|
* Notice that, the methods of the given {@code consumer} will be called directly in the rpc
|
||||||
|
* framework's callback thread, so typically you should not do any time consuming work inside
|
||||||
|
* these methods, otherwise you will be likely to block at least one connection to RS(even more if
|
||||||
|
* the rpc framework uses NIO).
|
||||||
|
* <p>
|
||||||
|
* This method is only for experts, do <strong>NOT</strong> use this method if you have other
|
||||||
|
* choice.
|
||||||
|
* @param scan A configured {@link Scan} object.
|
||||||
|
* @param consumer the consumer used to receive results.
|
||||||
|
*/
|
||||||
|
void scan(Scan scan, ScanResultConsumer consumer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class AsyncTableImpl implements AsyncTable {
|
class AsyncTableImpl implements AsyncTable {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class);
|
||||||
|
|
||||||
private final AsyncConnectionImpl conn;
|
private final AsyncConnectionImpl conn;
|
||||||
|
|
||||||
private final TableName tableName;
|
private final TableName tableName;
|
||||||
|
@ -348,6 +352,20 @@ class AsyncTableImpl implements AsyncTable {
|
||||||
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
|
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void scan(Scan scan, ScanResultConsumer consumer) {
|
||||||
|
if (scan.isSmall()) {
|
||||||
|
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
|
||||||
|
consumer.onError(
|
||||||
|
new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
|
||||||
|
} else {
|
||||||
|
LOG.warn("This is small scan " + scan + ", consider using smallScan directly?");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scan = setDefaultScanConfig(scan);
|
||||||
|
new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
|
||||||
|
.start();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
|
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
|
||||||
this.readRpcTimeoutNs = unit.toNanos(timeout);
|
this.readRpcTimeoutNs = unit.toNanos(timeout);
|
||||||
|
@ -377,4 +395,14 @@ class AsyncTableImpl implements AsyncTable {
|
||||||
public long getOperationTimeout(TimeUnit unit) {
|
public long getOperationTimeout(TimeUnit unit) {
|
||||||
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setScanTimeout(long timeout, TimeUnit unit) {
|
||||||
|
this.scanTimeoutNs = unit.toNanos(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getScanTimeout(TimeUnit unit) {
|
||||||
|
return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class ClientScanner extends AbstractClientScanner {
|
public abstract class ClientScanner extends AbstractClientScanner {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ClientScanner.class);
|
private static final Log LOG = LogFactory.getLog(ClientScanner.class);
|
||||||
|
|
||||||
protected Scan scan;
|
protected Scan scan;
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A scan result cache that only returns complete result.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class CompleteScanResultCache implements ScanResultCache {
|
||||||
|
|
||||||
|
private final List<Result> partialResults = new ArrayList<>();
|
||||||
|
|
||||||
|
private Result combine() throws IOException {
|
||||||
|
Result result = Result.createCompleteResult(partialResults);
|
||||||
|
partialResults.clear();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Result[] prependCombined(Result[] results, int length) throws IOException {
|
||||||
|
Result[] prependResults = new Result[length + 1];
|
||||||
|
prependResults[0] = combine();
|
||||||
|
System.arraycopy(results, 0, prependResults, 1, length);
|
||||||
|
return prependResults;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
|
||||||
|
// If no results were returned it indicates that either we have the all the partial results
|
||||||
|
// necessary to construct the complete result or the server had to send a heartbeat message
|
||||||
|
// to the client to keep the client-server connection alive
|
||||||
|
if (results.length == 0) {
|
||||||
|
// If this response was an empty heartbeat message, then we have not exhausted the region
|
||||||
|
// and thus there may be more partials server side that still need to be added to the partial
|
||||||
|
// list before we form the complete Result
|
||||||
|
if (!partialResults.isEmpty() && !isHeartbeatMessage) {
|
||||||
|
return new Result[] { combine() };
|
||||||
|
}
|
||||||
|
return EMPTY_RESULT_ARRAY;
|
||||||
|
}
|
||||||
|
// In every RPC response there should be at most a single partial result. Furthermore, if
|
||||||
|
// there is a partial result, it is guaranteed to be in the last position of the array.
|
||||||
|
Result last = results[results.length - 1];
|
||||||
|
if (last.isPartial()) {
|
||||||
|
if (partialResults.isEmpty()) {
|
||||||
|
partialResults.add(last);
|
||||||
|
return Arrays.copyOf(results, results.length - 1);
|
||||||
|
}
|
||||||
|
// We have only one result and it is partial
|
||||||
|
if (results.length == 1) {
|
||||||
|
// check if there is a row change
|
||||||
|
if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
|
||||||
|
partialResults.add(last);
|
||||||
|
return EMPTY_RESULT_ARRAY;
|
||||||
|
}
|
||||||
|
Result completeResult = combine();
|
||||||
|
partialResults.add(last);
|
||||||
|
return new Result[] { completeResult };
|
||||||
|
}
|
||||||
|
// We have some complete results
|
||||||
|
Result[] resultsToReturn = prependCombined(results, results.length - 1);
|
||||||
|
partialResults.add(last);
|
||||||
|
return resultsToReturn;
|
||||||
|
}
|
||||||
|
if (!partialResults.isEmpty()) {
|
||||||
|
return prependCombined(results, results.length);
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
partialResults.clear();
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,11 +24,13 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -37,11 +39,14 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility used by client connections.
|
* Utility used by client connections.
|
||||||
|
@ -264,4 +269,25 @@ public final class ConnectionUtils {
|
||||||
static boolean isEmptyStopRow(byte[] row) {
|
static boolean isEmptyStopRow(byte[] row) {
|
||||||
return Bytes.equals(row, EMPTY_END_ROW);
|
return Bytes.equals(row, EMPTY_END_ROW);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void resetController(HBaseRpcController controller, long timeoutNs) {
|
||||||
|
controller.reset();
|
||||||
|
if (timeoutNs >= 0) {
|
||||||
|
controller.setCallTimeout(
|
||||||
|
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Throwable translateException(Throwable t) {
|
||||||
|
if (t instanceof UndeclaredThrowableException && t.getCause() != null) {
|
||||||
|
t = t.getCause();
|
||||||
|
}
|
||||||
|
if (t instanceof RemoteException) {
|
||||||
|
t = ((RemoteException) t).unwrapRemoteException();
|
||||||
|
}
|
||||||
|
if (t instanceof ServiceException && t.getCause() != null) {
|
||||||
|
t = translateException(t.getCause());
|
||||||
|
}
|
||||||
|
return t;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to separate the row constructing logic.
|
||||||
|
* <p>
|
||||||
|
* After we add heartbeat support for scan, RS may return partial result even if allowPartial is
|
||||||
|
* false and batch is 0. With this interface, the implementation now looks like:
|
||||||
|
* <ol>
|
||||||
|
* <li>Get results from ScanResponse proto.</li>
|
||||||
|
* <li>Pass them to ScanResultCache and get something back.</li>
|
||||||
|
* <li>If we actually get something back, then pass it to ScanObserver.</li>
|
||||||
|
* </ol>
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
interface ScanResultCache {
|
||||||
|
|
||||||
|
static final Result[] EMPTY_RESULT_ARRAY = new Result[0];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the given results to cache and get valid results back.
|
||||||
|
* @param results the results of a scan next. Must not be null.
|
||||||
|
* @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response.
|
||||||
|
* @return valid results, never null.
|
||||||
|
*/
|
||||||
|
Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear the cached result if any. Called when scan error and we will start from a start of a row
|
||||||
|
* again.
|
||||||
|
*/
|
||||||
|
void clear();
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receives {@link Result} from an asynchronous scanner.
|
||||||
|
* <p>
|
||||||
|
* Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send
|
||||||
|
* request to HBase service. So if you want the asynchronous scanner fetch data from HBase in
|
||||||
|
* background while you process the returned data, you need to move the processing work to another
|
||||||
|
* thread to make the {@code onNext} call return immediately. And please do NOT do any time
|
||||||
|
* consuming tasks in all methods below unless you know what you are doing.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public interface ScanResultConsumer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param results the data fetched from HBase service.
|
||||||
|
* @return {@code false} if you want to stop the scanner process. Otherwise {@code true}
|
||||||
|
*/
|
||||||
|
boolean onNext(Result[] results);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that there is an heartbeat message but we have not cumulated enough cells to call
|
||||||
|
* onNext.
|
||||||
|
* <p>
|
||||||
|
* This method give you a chance to terminate a slow scan operation.
|
||||||
|
* @return {@code false} if you want to stop the scanner process. Otherwise {@code true}
|
||||||
|
*/
|
||||||
|
boolean onHeartbeat();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that we hit an unrecoverable error and the scan operation is terminated.
|
||||||
|
* <p>
|
||||||
|
* We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
|
||||||
|
*/
|
||||||
|
void onError(Throwable error);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate that the scan operation is completed normally.
|
||||||
|
*/
|
||||||
|
void onComplete();
|
||||||
|
}
|
|
@ -0,0 +1,155 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public abstract class AbstractTestAsyncTableScan {
|
||||||
|
|
||||||
|
protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
protected static TableName TABLE_NAME = TableName.valueOf("async");
|
||||||
|
|
||||||
|
protected static byte[] FAMILY = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
protected static byte[] CQ1 = Bytes.toBytes("cq1");
|
||||||
|
|
||||||
|
protected static byte[] CQ2 = Bytes.toBytes("cq2");
|
||||||
|
|
||||||
|
protected static int COUNT = 1000;
|
||||||
|
|
||||||
|
protected static AsyncConnection ASYNC_CONN;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
TEST_UTIL.startMiniCluster(3);
|
||||||
|
byte[][] splitKeys = new byte[8][];
|
||||||
|
for (int i = 111; i < 999; i += 111) {
|
||||||
|
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
||||||
|
}
|
||||||
|
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
|
||||||
|
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||||
|
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
|
||||||
|
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||||
|
List<CompletableFuture<?>> futures = new ArrayList<>();
|
||||||
|
IntStream.range(0, COUNT).forEach(
|
||||||
|
i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
|
||||||
|
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
|
||||||
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
ASYNC_CONN.close();
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract Scan createScan();
|
||||||
|
|
||||||
|
protected abstract List<Result> doScan(AsyncTable table, Scan scan) throws Exception;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanAll() throws Exception {
|
||||||
|
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan());
|
||||||
|
assertEquals(COUNT, results.size());
|
||||||
|
IntStream.range(0, COUNT).forEach(i -> {
|
||||||
|
Result result = results.get(i);
|
||||||
|
assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
|
||||||
|
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertResultEquals(Result result, int i) {
|
||||||
|
assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
|
||||||
|
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
|
||||||
|
assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReversedScanAll() throws Exception {
|
||||||
|
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan().setReversed(true));
|
||||||
|
assertEquals(COUNT, results.size());
|
||||||
|
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanNoStopKey() throws Exception {
|
||||||
|
int start = 345;
|
||||||
|
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||||
|
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))));
|
||||||
|
assertEquals(COUNT - start, results.size());
|
||||||
|
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReverseScanNoStopKey() throws Exception {
|
||||||
|
int start = 765;
|
||||||
|
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||||
|
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
|
||||||
|
assertEquals(start + 1, results.size());
|
||||||
|
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testScan(int start, int stop) throws Exception {
|
||||||
|
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||||
|
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||||
|
.setStopRow(Bytes.toBytes(String.format("%03d", stop))));
|
||||||
|
assertEquals(stop - start, results.size());
|
||||||
|
IntStream.range(0, stop - start).forEach(i -> assertResultEquals(results.get(i), start + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testReversedScan(int start, int stop) throws Exception {
|
||||||
|
List<Result> results = doScan(ASYNC_CONN.getTable(TABLE_NAME),
|
||||||
|
createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||||
|
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true));
|
||||||
|
assertEquals(start - stop, results.size());
|
||||||
|
IntStream.range(0, start - stop).forEach(i -> assertResultEquals(results.get(i), start - i));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanWithStartKeyAndStopKey() throws Exception {
|
||||||
|
testScan(345, 567);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReversedScanWithStartKeyAndStopKey() throws Exception {
|
||||||
|
testReversedScan(765, 543);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanAtRegionBoundary() throws Exception {
|
||||||
|
testScan(222, 333);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReversedScanAtRegionBoundary() throws Exception {
|
||||||
|
testScan(222, 333);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ SmallTests.class, ClientTests.class })
|
||||||
|
public class TestAllowPartialScanResultCache {
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private AllowPartialScanResultCache resultCache;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
resultCache = new AllowPartialScanResultCache();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
resultCache.clear();
|
||||||
|
resultCache = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Cell createCell(int key, int cq) {
|
||||||
|
return new KeyValue(Bytes.toBytes(key), CF, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws IOException {
|
||||||
|
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
|
||||||
|
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
|
||||||
|
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
|
||||||
|
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
|
||||||
|
|
||||||
|
Cell[] cells1 = IntStream.range(0, 10).mapToObj(i -> createCell(1, i)).toArray(Cell[]::new);
|
||||||
|
Cell[] cells2 = IntStream.range(0, 10).mapToObj(i -> createCell(2, i)).toArray(Cell[]::new);
|
||||||
|
|
||||||
|
Result[] results1 = resultCache.addAndGet(
|
||||||
|
new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
|
||||||
|
assertEquals(1, results1.length);
|
||||||
|
assertEquals(1, Bytes.toInt(results1[0].getRow()));
|
||||||
|
assertEquals(5, results1[0].rawCells().length);
|
||||||
|
IntStream.range(0, 5).forEach(
|
||||||
|
i -> assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i)))));
|
||||||
|
|
||||||
|
Result[] results2 = resultCache.addAndGet(
|
||||||
|
new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
|
||||||
|
assertEquals(1, results2.length);
|
||||||
|
assertEquals(1, Bytes.toInt(results2[0].getRow()));
|
||||||
|
assertEquals(5, results2[0].rawCells().length);
|
||||||
|
IntStream.range(5, 10).forEach(
|
||||||
|
i -> assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i)))));
|
||||||
|
|
||||||
|
Result[] results3 = resultCache
|
||||||
|
.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
|
||||||
|
assertEquals(1, results3.length);
|
||||||
|
assertEquals(2, Bytes.toInt(results3[0].getRow()));
|
||||||
|
assertEquals(10, results3[0].rawCells().length);
|
||||||
|
IntStream.range(0, 10).forEach(
|
||||||
|
i -> assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i)))));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,147 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
||||||
|
|
||||||
|
private static final class SimpleScanResultConsumer implements ScanResultConsumer {
|
||||||
|
|
||||||
|
private final Queue<Result> queue = new ArrayDeque<>();
|
||||||
|
|
||||||
|
private boolean finished;
|
||||||
|
|
||||||
|
private Throwable error;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean onNext(Result[] results) {
|
||||||
|
for (Result result : results) {
|
||||||
|
queue.offer(result);
|
||||||
|
}
|
||||||
|
notifyAll();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean onHeartbeat() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onError(Throwable error) {
|
||||||
|
finished = true;
|
||||||
|
this.error = error;
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onComplete() {
|
||||||
|
finished = true;
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized Result take() throws IOException, InterruptedException {
|
||||||
|
for (;;) {
|
||||||
|
if (!queue.isEmpty()) {
|
||||||
|
return queue.poll();
|
||||||
|
}
|
||||||
|
if (finished) {
|
||||||
|
if (error != null) {
|
||||||
|
Throwables.propagateIfPossible(error, IOException.class);
|
||||||
|
throw new IOException(error);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameter
|
||||||
|
public Supplier<Scan> scanCreater;
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static List<Object[]> params() {
|
||||||
|
return Arrays.asList(new Supplier<?>[] { TestAsyncTableScan::createNormalScan },
|
||||||
|
new Supplier<?>[] { TestAsyncTableScan::createBatchScan });
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Scan createNormalScan() {
|
||||||
|
return new Scan();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Scan createBatchScan() {
|
||||||
|
return new Scan().setBatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Scan createScan() {
|
||||||
|
return scanCreater.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Result convertToPartial(Result result) {
|
||||||
|
return Result.create(result.rawCells(), result.getExists(), result.isStale(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
|
||||||
|
SimpleScanResultConsumer scanObserver = new SimpleScanResultConsumer();
|
||||||
|
table.scan(scan, scanObserver);
|
||||||
|
List<Result> results = new ArrayList<>();
|
||||||
|
for (Result result; (result = scanObserver.take()) != null;) {
|
||||||
|
results.add(result);
|
||||||
|
}
|
||||||
|
if (scan.getBatch() > 0) {
|
||||||
|
assertTrue(results.size() % 2 == 0);
|
||||||
|
return IntStream.range(0, results.size() / 2).mapToObj(i -> {
|
||||||
|
try {
|
||||||
|
return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)),
|
||||||
|
convertToPartial(results.get(2 * i + 1))));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,166 +19,18 @@ package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({ MediumTests.class, ClientTests.class })
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
public class TestAsyncTableSmallScan {
|
public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan {
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private static TableName TABLE_NAME = TableName.valueOf("async");
|
|
||||||
|
|
||||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
|
||||||
|
|
||||||
private static byte[] QUALIFIER = Bytes.toBytes("cq");
|
|
||||||
|
|
||||||
private static int COUNT = 1000;
|
|
||||||
|
|
||||||
private static AsyncConnection ASYNC_CONN;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUp() throws Exception {
|
|
||||||
TEST_UTIL.startMiniCluster(3);
|
|
||||||
byte[][] splitKeys = new byte[8][];
|
|
||||||
for (int i = 111; i < 999; i += 111) {
|
|
||||||
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
|
||||||
}
|
|
||||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
|
|
||||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
|
||||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
|
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
|
||||||
List<CompletableFuture<?>> futures = new ArrayList<>();
|
|
||||||
IntStream.range(0, COUNT)
|
|
||||||
.forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
|
|
||||||
.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
|
|
||||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws Exception {
|
|
||||||
ASYNC_CONN.close();
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanAll() throws InterruptedException, ExecutionException {
|
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
|
||||||
List<Result> results = table.smallScan(new Scan().setSmall(true)).get();
|
|
||||||
assertEquals(COUNT, results.size());
|
|
||||||
IntStream.range(0, COUNT).forEach(i -> {
|
|
||||||
Result result = results.get(i);
|
|
||||||
assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
|
|
||||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReversedScanAll() throws InterruptedException, ExecutionException {
|
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
|
||||||
List<Result> results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get();
|
|
||||||
assertEquals(COUNT, results.size());
|
|
||||||
IntStream.range(0, COUNT).forEach(i -> {
|
|
||||||
Result result = results.get(i);
|
|
||||||
int actualIndex = COUNT - i - 1;
|
|
||||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
|
||||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanNoStopKey() throws InterruptedException, ExecutionException {
|
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
|
||||||
int start = 345;
|
|
||||||
List<Result> results = table
|
|
||||||
.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get();
|
|
||||||
assertEquals(COUNT - start, results.size());
|
|
||||||
IntStream.range(0, COUNT - start).forEach(i -> {
|
|
||||||
Result result = results.get(i);
|
|
||||||
int actualIndex = start + i;
|
|
||||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
|
||||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException {
|
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
|
||||||
int start = 765;
|
|
||||||
List<Result> results = table
|
|
||||||
.smallScan(
|
|
||||||
new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true))
|
|
||||||
.get();
|
|
||||||
assertEquals(start + 1, results.size());
|
|
||||||
IntStream.range(0, start + 1).forEach(i -> {
|
|
||||||
Result result = results.get(i);
|
|
||||||
int actualIndex = start - i;
|
|
||||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
|
||||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testScan(int start, int stop) throws InterruptedException, ExecutionException {
|
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
|
||||||
List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
|
|
||||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get();
|
|
||||||
assertEquals(stop - start, results.size());
|
|
||||||
IntStream.range(0, stop - start).forEach(i -> {
|
|
||||||
Result result = results.get(i);
|
|
||||||
int actualIndex = start + i;
|
|
||||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
|
||||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testReversedScan(int start, int stop)
|
|
||||||
throws InterruptedException, ExecutionException {
|
|
||||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
|
||||||
List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
|
|
||||||
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true))
|
|
||||||
.get();
|
|
||||||
assertEquals(start - stop, results.size());
|
|
||||||
IntStream.range(0, start - stop).forEach(i -> {
|
|
||||||
Result result = results.get(i);
|
|
||||||
int actualIndex = start - i;
|
|
||||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
|
||||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException {
|
|
||||||
testScan(345, 567);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReversedScanWithStartKeyAndStopKey()
|
|
||||||
throws InterruptedException, ExecutionException {
|
|
||||||
testReversedScan(765, 543);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException {
|
|
||||||
testScan(222, 333);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException {
|
|
||||||
testScan(222, 333);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScanWithLimit() throws InterruptedException, ExecutionException {
|
public void testScanWithLimit() throws InterruptedException, ExecutionException {
|
||||||
|
@ -194,7 +46,7 @@ public class TestAsyncTableSmallScan {
|
||||||
Result result = results.get(i);
|
Result result = results.get(i);
|
||||||
int actualIndex = start + i;
|
int actualIndex = start + i;
|
||||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
||||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +65,17 @@ public class TestAsyncTableSmallScan {
|
||||||
Result result = results.get(i);
|
Result result = results.get(i);
|
||||||
int actualIndex = start - i;
|
int actualIndex = start - i;
|
||||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
||||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Scan createScan() {
|
||||||
|
return new Scan().setSmall(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<Result> doScan(AsyncTable table, Scan scan) throws Exception {
|
||||||
|
return table.smallScan(scan).get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,159 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ SmallTests.class, ClientTests.class })
|
||||||
|
public class TestCompleteResultScanResultCache {
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private static byte[] CQ1 = Bytes.toBytes("cq1");
|
||||||
|
|
||||||
|
private static byte[] CQ2 = Bytes.toBytes("cq2");
|
||||||
|
|
||||||
|
private static byte[] CQ3 = Bytes.toBytes("cq3");
|
||||||
|
|
||||||
|
private CompleteScanResultCache resultCache;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
resultCache = new CompleteScanResultCache();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
resultCache.clear();
|
||||||
|
resultCache = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Cell createCell(int key, byte[] cq) {
|
||||||
|
return new KeyValue(Bytes.toBytes(key), CF, cq, Bytes.toBytes(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoPartial() throws IOException {
|
||||||
|
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
|
||||||
|
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
|
||||||
|
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
|
||||||
|
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
|
||||||
|
int count = 10;
|
||||||
|
Result[] results = new Result[count];
|
||||||
|
IntStream.range(0, count).forEach(i -> {
|
||||||
|
results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
|
||||||
|
});
|
||||||
|
assertSame(results, resultCache.addAndGet(results, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombine1() throws IOException {
|
||||||
|
Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true);
|
||||||
|
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
|
||||||
|
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
|
||||||
|
Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
|
||||||
|
Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false);
|
||||||
|
assertEquals(1, results.length);
|
||||||
|
assertSame(previousResult, results[0]);
|
||||||
|
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[0], true).length);
|
||||||
|
|
||||||
|
results = resultCache.addAndGet(new Result[0], false);
|
||||||
|
assertEquals(1, results.length);
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getRow()));
|
||||||
|
assertEquals(3, results[0].rawCells().length);
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombine2() throws IOException {
|
||||||
|
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
|
||||||
|
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
|
||||||
|
Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
|
||||||
|
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
|
||||||
|
Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false);
|
||||||
|
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
|
||||||
|
|
||||||
|
Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false);
|
||||||
|
assertEquals(1, results.length);
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getRow()));
|
||||||
|
assertEquals(3, results[0].rawCells().length);
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
|
||||||
|
|
||||||
|
results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false);
|
||||||
|
assertEquals(2, results.length);
|
||||||
|
assertEquals(2, Bytes.toInt(results[0].getRow()));
|
||||||
|
assertEquals(1, results[0].rawCells().length);
|
||||||
|
assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
|
||||||
|
assertEquals(3, Bytes.toInt(results[1].getRow()));
|
||||||
|
assertEquals(1, results[1].rawCells().length);
|
||||||
|
assertEquals(3, Bytes.toInt(results[1].getValue(CF, CQ2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombine3() throws IOException {
|
||||||
|
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
|
||||||
|
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
|
||||||
|
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false);
|
||||||
|
Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true);
|
||||||
|
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
|
||||||
|
assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
|
||||||
|
|
||||||
|
Result[] results = resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 },
|
||||||
|
false);
|
||||||
|
assertEquals(2, results.length);
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getRow()));
|
||||||
|
assertEquals(2, results[0].rawCells().length);
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
|
||||||
|
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
|
||||||
|
assertEquals(2, Bytes.toInt(results[1].getRow()));
|
||||||
|
assertEquals(1, results[1].rawCells().length);
|
||||||
|
assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1)));
|
||||||
|
|
||||||
|
results = resultCache.addAndGet(new Result[0], false);
|
||||||
|
assertEquals(1, results.length);
|
||||||
|
assertEquals(3, Bytes.toInt(results[0].getRow()));
|
||||||
|
assertEquals(1, results[0].rawCells().length);
|
||||||
|
assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1)));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue