HBASE-18489 Expose scan cursor in RawScanResultConsumer
This commit is contained in:
parent
a902175553
commit
64345a5b5a
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||||
|
@ -144,7 +145,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
private final class ScanControllerImpl implements RawScanResultConsumer.ScanController {
|
private final class ScanControllerImpl implements RawScanResultConsumer.ScanController {
|
||||||
|
|
||||||
// Make sure the methods are only called in this thread.
|
// Make sure the methods are only called in this thread.
|
||||||
private final Thread callerThread = Thread.currentThread();
|
private final Thread callerThread;
|
||||||
|
|
||||||
|
private final Optional<Cursor> cursor;
|
||||||
|
|
||||||
// INITIALIZED -> SUSPENDED -> DESTROYED
|
// INITIALIZED -> SUSPENDED -> DESTROYED
|
||||||
// INITIALIZED -> TERMINATED -> DESTROYED
|
// INITIALIZED -> TERMINATED -> DESTROYED
|
||||||
|
@ -154,6 +157,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
|
|
||||||
private ScanResumerImpl resumer;
|
private ScanResumerImpl resumer;
|
||||||
|
|
||||||
|
public ScanControllerImpl(ScanResponse resp) {
|
||||||
|
callerThread = Thread.currentThread();
|
||||||
|
cursor = resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
|
||||||
|
: Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
private void preCheck() {
|
private void preCheck() {
|
||||||
Preconditions.checkState(Thread.currentThread() == callerThread,
|
Preconditions.checkState(Thread.currentThread() == callerThread,
|
||||||
"The current thread is %s, expected thread is %s, " +
|
"The current thread is %s, expected thread is %s, " +
|
||||||
|
@ -184,6 +193,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
this.state = ScanControllerState.DESTROYED;
|
this.state = ScanControllerState.DESTROYED;
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<Cursor> cursor() {
|
||||||
|
return cursor;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private enum ScanResumerState {
|
private enum ScanResumerState {
|
||||||
|
@ -479,7 +493,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ScanControllerImpl scanController = new ScanControllerImpl();
|
ScanControllerImpl scanController = new ScanControllerImpl(resp);
|
||||||
if (results.length > 0) {
|
if (results.length > 0) {
|
||||||
updateNextStartRowWhenError(results[results.length - 1]);
|
updateNextStartRowWhenError(results[results.length - 1]);
|
||||||
consumer.onNext(results, scanController);
|
consumer.onNext(results, scanController);
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
|
@ -47,14 +49,14 @@ public interface RawScanResultConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to suspend or stop a scan.
|
* Used to suspend or stop a scan, or get a scan cursor if available.
|
||||||
* <p>
|
* <p>
|
||||||
* Notice that, you should only call the methods below inside onNext or onHeartbeat method. A
|
* Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext
|
||||||
* IllegalStateException will be thrown if you call them at other places.
|
* or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places.
|
||||||
* <p>
|
* <p>
|
||||||
* You can only call one of the methods below, i.e., call suspend or terminate(of course you are
|
* You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you
|
||||||
* free to not call them both), and the methods are not reentrant. A IllegalStateException will be
|
* are free to not call them both), and the methods are not reentrant. An IllegalStateException
|
||||||
* thrown if you have already called one of the methods.
|
* will be thrown if you have already called one of the methods.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
interface ScanController {
|
interface ScanController {
|
||||||
|
@ -75,6 +77,12 @@ public interface RawScanResultConsumer {
|
||||||
* or you want to stop the scan in onHeartbeat method because it has spent too many time.
|
* or you want to stop the scan in onHeartbeat method because it has spent too many time.
|
||||||
*/
|
*/
|
||||||
void terminate();
|
void terminate();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the scan cursor if available.
|
||||||
|
* @return The scan cursor.
|
||||||
|
*/
|
||||||
|
Optional<Cursor> cursor();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,10 +18,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.cache.Cache;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.cache.CacheBuilder;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
@ -119,6 +115,9 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.security.Superusers;
|
import org.apache.hadoop.hbase.security.Superusers;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.cache.Cache;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.cache.CacheBuilder;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
||||||
|
@ -221,8 +220,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the regionserver RPC services.
|
* Implements the regionserver RPC services.
|
||||||
*/
|
*/
|
||||||
|
@ -3108,9 +3105,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
// Heartbeat messages occur when the time limit has been reached.
|
// Heartbeat messages occur when the time limit has been reached.
|
||||||
builder.setHeartbeatMessage(timeLimitReached);
|
builder.setHeartbeatMessage(timeLimitReached);
|
||||||
if (timeLimitReached && rsh.needCursor) {
|
if (timeLimitReached && rsh.needCursor) {
|
||||||
Cell readingCell = scannerContext.getPeekedCellInHeartbeat();
|
Cell cursorCell = scannerContext.getLastPeekedCell();
|
||||||
if (readingCell != null ) {
|
if (cursorCell != null ) {
|
||||||
builder.setCursor(ProtobufUtil.toCursor(readingCell));
|
builder.setCursor(ProtobufUtil.toCursor(cursorCell));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
@ -97,7 +95,7 @@ public class ScannerContext {
|
||||||
boolean keepProgress;
|
boolean keepProgress;
|
||||||
private static boolean DEFAULT_KEEP_PROGRESS = false;
|
private static boolean DEFAULT_KEEP_PROGRESS = false;
|
||||||
|
|
||||||
private Cell peekedCellInHeartbeat = null;
|
private Cell lastPeekedCell = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked
|
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked
|
||||||
|
@ -333,12 +331,12 @@ public class ScannerContext {
|
||||||
|| checkTimeLimit(checkerScope);
|
|| checkTimeLimit(checkerScope);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Cell getPeekedCellInHeartbeat() {
|
Cell getLastPeekedCell() {
|
||||||
return peekedCellInHeartbeat;
|
return lastPeekedCell;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPeekedCellInHeartbeat(Cell peekedCellInHeartbeat) {
|
void setLastPeekedCell(Cell lastPeekedCell) {
|
||||||
this.peekedCellInHeartbeat = peekedCellInHeartbeat;
|
this.lastPeekedCell = lastPeekedCell;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -48,11 +48,10 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatc
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
|
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
|
||||||
* for a single row.
|
* for a single row.
|
||||||
|
@ -105,7 +104,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
* KVs skipped via seeking to next row/column. TODO: estimate them?
|
* KVs skipped via seeking to next row/column. TODO: estimate them?
|
||||||
*/
|
*/
|
||||||
private long kvsScanned = 0;
|
private long kvsScanned = 0;
|
||||||
private Cell prevCell = null;
|
protected Cell prevCell = null;
|
||||||
|
|
||||||
private final long preadMaxBytes;
|
private final long preadMaxBytes;
|
||||||
private long bytesRead;
|
private long bytesRead;
|
||||||
|
@ -593,7 +592,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
|
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
|
||||||
scannerContext.updateTimeProgress();
|
scannerContext.updateTimeProgress();
|
||||||
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
|
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
|
||||||
scannerContext.setPeekedCellInHeartbeat(prevCell);
|
|
||||||
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
|
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -605,6 +603,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
|
int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
|
||||||
bytesRead += cellSize;
|
bytesRead += cellSize;
|
||||||
prevCell = cell;
|
prevCell = cell;
|
||||||
|
scannerContext.setLastPeekedCell(cell);
|
||||||
topChanged = false;
|
topChanged = false;
|
||||||
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
|
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
|
||||||
switch (qcode) {
|
switch (qcode) {
|
||||||
|
|
|
@ -0,0 +1,146 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTestConst;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
public abstract class AbstractTestScanCursor {
|
||||||
|
|
||||||
|
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Table configuration
|
||||||
|
*/
|
||||||
|
protected static TableName TABLE_NAME = TableName.valueOf("TestScanCursor");
|
||||||
|
|
||||||
|
protected static int NUM_ROWS = 5;
|
||||||
|
protected static byte[] ROW = Bytes.toBytes("testRow");
|
||||||
|
protected static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
|
||||||
|
|
||||||
|
protected static int NUM_FAMILIES = 2;
|
||||||
|
protected static byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||||
|
protected static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
|
||||||
|
|
||||||
|
protected static int NUM_QUALIFIERS = 2;
|
||||||
|
protected static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||||
|
protected static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
|
||||||
|
|
||||||
|
protected static int VALUE_SIZE = 10;
|
||||||
|
protected static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
|
||||||
|
|
||||||
|
protected static final int TIMEOUT = 4000;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
|
||||||
|
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
|
||||||
|
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
|
||||||
|
|
||||||
|
// Check the timeout condition after every cell
|
||||||
|
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
|
||||||
|
createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createTestTable(TableName name, byte[][] rows, byte[][] families,
|
||||||
|
byte[][] qualifiers, byte[] cellValue) throws IOException {
|
||||||
|
TEST_UTIL.createTable(name, families).put(createPuts(rows, families, qualifiers, cellValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
|
||||||
|
byte[] value) throws IOException {
|
||||||
|
List<Put> puts = new ArrayList<>();
|
||||||
|
for (int row = 0; row < rows.length; row++) {
|
||||||
|
Put put = new Put(rows[row]);
|
||||||
|
for (int fam = 0; fam < families.length; fam++) {
|
||||||
|
for (int qual = 0; qual < qualifiers.length; qual++) {
|
||||||
|
KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
|
||||||
|
put.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
puts.add(put);
|
||||||
|
}
|
||||||
|
return puts;
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class SparseFilter extends FilterBase {
|
||||||
|
|
||||||
|
private final boolean reversed;
|
||||||
|
|
||||||
|
public SparseFilter(boolean reversed) {
|
||||||
|
this.reversed = reversed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReturnCode filterKeyValue(Cell v) throws IOException {
|
||||||
|
Threads.sleep(TIMEOUT / 2 + 100);
|
||||||
|
return Bytes.equals(CellUtil.cloneRow(v), ROWS[reversed ? 0 : NUM_ROWS - 1])
|
||||||
|
? ReturnCode.INCLUDE
|
||||||
|
: ReturnCode.SKIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] toByteArray() throws IOException {
|
||||||
|
return reversed ? new byte[] { 1 } : new byte[] { 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Filter parseFrom(final byte[] pbBytes) {
|
||||||
|
return new SparseFilter(pbBytes[0] != 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Scan createScanWithSparseFilter() {
|
||||||
|
return new Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE)
|
||||||
|
.setNeedCursorResult(true).setAllowPartialResults(true).setFilter(new SparseFilter(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Scan createReversedScanWithSparseFilter() {
|
||||||
|
return new Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE)
|
||||||
|
.setReversed(true).setNeedCursorResult(true).setAllowPartialResults(true)
|
||||||
|
.setFilter(new SparseFilter(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Scan createScanWithSizeLimit() {
|
||||||
|
return new Scan().setMaxResultSize(1).setCaching(Integer.MAX_VALUE).setNeedCursorResult(true);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
|
||||||
|
|
||||||
|
private void doTest(boolean reversed)
|
||||||
|
throws InterruptedException, ExecutionException, IOException {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
try (AsyncConnection conn =
|
||||||
|
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
|
||||||
|
RawAsyncTable table = conn.getRawTable(TABLE_NAME);
|
||||||
|
table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(),
|
||||||
|
new RawScanResultConsumer() {
|
||||||
|
|
||||||
|
private int count;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onHeartbeat(ScanController controller) {
|
||||||
|
int row = count / NUM_FAMILIES / NUM_QUALIFIERS;
|
||||||
|
if (reversed) {
|
||||||
|
row = NUM_ROWS - 1 - row;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
assertArrayEquals(ROWS[row], controller.cursor().get().getRow());
|
||||||
|
count++;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(Result[] results, ScanController controller) {
|
||||||
|
try {
|
||||||
|
assertEquals(1, results.length);
|
||||||
|
assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
|
||||||
|
// we will always provide a scan cursor if time limit is reached.
|
||||||
|
if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
|
||||||
|
assertFalse(controller.cursor().isPresent());
|
||||||
|
} else {
|
||||||
|
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
|
||||||
|
controller.cursor().get().getRow());
|
||||||
|
}
|
||||||
|
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
|
||||||
|
count++;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable error) {
|
||||||
|
future.completeExceptionally(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
future.complete(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
future.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHeartbeatWithSparseFilter()
|
||||||
|
throws IOException, InterruptedException, ExecutionException {
|
||||||
|
doTest(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHeartbeatWithSparseFilterReversed()
|
||||||
|
throws IOException, InterruptedException, ExecutionException {
|
||||||
|
doTest(true);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
|
public class TestScanCursor extends AbstractTestScanCursor {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHeartbeatWithSparseFilter() throws Exception {
|
||||||
|
try (ResultScanner scanner =
|
||||||
|
TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter())) {
|
||||||
|
int num = 0;
|
||||||
|
Result r;
|
||||||
|
while ((r = scanner.next()) != null) {
|
||||||
|
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
|
||||||
|
Assert.assertTrue(r.isCursor());
|
||||||
|
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
|
||||||
|
r.getCursor().getRow());
|
||||||
|
} else {
|
||||||
|
Assert.assertFalse(r.isCursor());
|
||||||
|
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
||||||
|
}
|
||||||
|
num++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHeartbeatWithSparseFilterReversed() throws Exception {
|
||||||
|
try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME)
|
||||||
|
.getScanner(createReversedScanWithSparseFilter())) {
|
||||||
|
int num = 0;
|
||||||
|
Result r;
|
||||||
|
while ((r = scanner.next()) != null) {
|
||||||
|
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
|
||||||
|
Assert.assertTrue(r.isCursor());
|
||||||
|
Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS],
|
||||||
|
r.getCursor().getRow());
|
||||||
|
} else {
|
||||||
|
Assert.assertFalse(r.isCursor());
|
||||||
|
Assert.assertArrayEquals(ROWS[0], r.getRow());
|
||||||
|
}
|
||||||
|
num++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSizeLimit() throws IOException {
|
||||||
|
try (ResultScanner scanner =
|
||||||
|
TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit())) {
|
||||||
|
int num = 0;
|
||||||
|
Result r;
|
||||||
|
while ((r = scanner.next()) != null) {
|
||||||
|
if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) {
|
||||||
|
Assert.assertTrue(r.isCursor());
|
||||||
|
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS],
|
||||||
|
r.getCursor().getRow());
|
||||||
|
} else {
|
||||||
|
Assert.assertFalse(r.isCursor());
|
||||||
|
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
||||||
|
}
|
||||||
|
num++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,191 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.HTestConst;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
|
||||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
|
||||||
public class TestScannerCursor {
|
|
||||||
|
|
||||||
private static final Log LOG =
|
|
||||||
LogFactory.getLog(TestScannerCursor.class);
|
|
||||||
|
|
||||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private static Table TABLE = null;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Table configuration
|
|
||||||
*/
|
|
||||||
private static TableName TABLE_NAME = TableName.valueOf("TestScannerCursor");
|
|
||||||
|
|
||||||
private static int NUM_ROWS = 5;
|
|
||||||
private static byte[] ROW = Bytes.toBytes("testRow");
|
|
||||||
private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
|
|
||||||
|
|
||||||
private static int NUM_FAMILIES = 2;
|
|
||||||
private static byte[] FAMILY = Bytes.toBytes("testFamily");
|
|
||||||
private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
|
|
||||||
|
|
||||||
private static int NUM_QUALIFIERS = 2;
|
|
||||||
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
|
|
||||||
private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
|
|
||||||
|
|
||||||
private static int VALUE_SIZE = 10;
|
|
||||||
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
|
|
||||||
|
|
||||||
private static final int TIMEOUT = 4000;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUpBeforeClass() throws Exception {
|
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
|
||||||
|
|
||||||
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
|
|
||||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
|
|
||||||
|
|
||||||
// Check the timeout condition after every cell
|
|
||||||
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
|
|
||||||
TEST_UTIL.startMiniCluster(1);
|
|
||||||
|
|
||||||
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
|
|
||||||
byte[][] qualifiers, byte[] cellValue) throws IOException {
|
|
||||||
Table ht = TEST_UTIL.createTable(name, families);
|
|
||||||
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
|
|
||||||
ht.put(puts);
|
|
||||||
return ht;
|
|
||||||
}
|
|
||||||
|
|
||||||
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
|
|
||||||
byte[] value) throws IOException {
|
|
||||||
Put put;
|
|
||||||
ArrayList<Put> puts = new ArrayList<>();
|
|
||||||
|
|
||||||
for (int row = 0; row < rows.length; row++) {
|
|
||||||
put = new Put(rows[row]);
|
|
||||||
for (int fam = 0; fam < families.length; fam++) {
|
|
||||||
for (int qual = 0; qual < qualifiers.length; qual++) {
|
|
||||||
KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
|
|
||||||
put.add(kv);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
puts.add(put);
|
|
||||||
}
|
|
||||||
|
|
||||||
return puts;
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class SparseFilter extends FilterBase {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReturnCode filterKeyValue(Cell v) throws IOException {
|
|
||||||
Threads.sleep(TIMEOUT / 2 + 100);
|
|
||||||
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
|
|
||||||
: ReturnCode.SKIP;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Filter parseFrom(final byte[] pbBytes) {
|
|
||||||
return new SparseFilter();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testHeartbeatWithSparseFilter() throws Exception {
|
|
||||||
Scan scan = new Scan();
|
|
||||||
scan.setMaxResultSize(Long.MAX_VALUE);
|
|
||||||
scan.setCaching(Integer.MAX_VALUE);
|
|
||||||
scan.setNeedCursorResult(true);
|
|
||||||
scan.setAllowPartialResults(true);
|
|
||||||
scan.setFilter(new SparseFilter());
|
|
||||||
try(ResultScanner scanner = TABLE.getScanner(scan)) {
|
|
||||||
int num = 0;
|
|
||||||
Result r;
|
|
||||||
while ((r = scanner.next()) != null) {
|
|
||||||
|
|
||||||
if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) {
|
|
||||||
Assert.assertTrue(r.isCursor());
|
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow());
|
|
||||||
} else {
|
|
||||||
Assert.assertFalse(r.isCursor());
|
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
|
||||||
}
|
|
||||||
num++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSizeLimit() throws IOException {
|
|
||||||
Scan scan = new Scan();
|
|
||||||
scan.setMaxResultSize(1);
|
|
||||||
scan.setCaching(Integer.MAX_VALUE);
|
|
||||||
scan.setNeedCursorResult(true);
|
|
||||||
try (ResultScanner scanner = TABLE.getScanner(scan)) {
|
|
||||||
int num = 0;
|
|
||||||
Result r;
|
|
||||||
while ((r = scanner.next()) != null) {
|
|
||||||
|
|
||||||
if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS)-1) {
|
|
||||||
Assert.assertTrue(r.isCursor());
|
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow());
|
|
||||||
} else {
|
|
||||||
Assert.assertFalse(r.isCursor());
|
|
||||||
Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow());
|
|
||||||
}
|
|
||||||
num++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue