diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index e5448d9c10b..89b3afc1cde 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -144,7 +145,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final class ScanControllerImpl implements RawScanResultConsumer.ScanController { // Make sure the methods are only called in this thread. - private final Thread callerThread = Thread.currentThread(); + private final Thread callerThread; + + private final Optional cursor; // INITIALIZED -> SUSPENDED -> DESTROYED // INITIALIZED -> TERMINATED -> DESTROYED @@ -154,6 +157,12 @@ class AsyncScanSingleRegionRpcRetryingCaller { private ScanResumerImpl resumer; + public ScanControllerImpl(ScanResponse resp) { + callerThread = Thread.currentThread(); + cursor = resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) + : Optional.empty(); + } + private void preCheck() { Preconditions.checkState(Thread.currentThread() == callerThread, "The current thread is %s, expected thread is %s, " + @@ -184,6 +193,11 @@ class AsyncScanSingleRegionRpcRetryingCaller { this.state = ScanControllerState.DESTROYED; return state; } + + @Override + public Optional cursor() { + return cursor; + } } private enum ScanResumerState { @@ -479,7 +493,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { return; } - ScanControllerImpl scanController = new ScanControllerImpl(); + ScanControllerImpl scanController = new ScanControllerImpl(resp); if (results.length > 0) { updateNextStartRowWhenError(results[results.length - 1]); consumer.onNext(results, scanController); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 820960b1f91..54d4887f056 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Optional; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -47,14 +49,14 @@ public interface RawScanResultConsumer { } /** - * Used to suspend or stop a scan. + * Used to suspend or stop a scan, or get a scan cursor if available. *

- * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A - * IllegalStateException will be thrown if you call them at other places. + * Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext + * or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places. *

- * You can only call one of the methods below, i.e., call suspend or terminate(of course you are - * free to not call them both), and the methods are not reentrant. A IllegalStateException will be - * thrown if you have already called one of the methods. + * You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you + * are free to not call them both), and the methods are not reentrant. An IllegalStateException + * will be thrown if you have already called one of the methods. */ @InterfaceAudience.Public interface ScanController { @@ -75,6 +77,12 @@ public interface RawScanResultConsumer { * or you want to stop the scan in onHeartbeat method because it has spent too many time. */ void terminate(); + + /** + * Get the scan cursor if available. + * @return The scan cursor. + */ + Optional cursor(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bc196a6c5ac..cc85ddb7c84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -18,10 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.cache.Cache; -import org.apache.hadoop.hbase.shaded.com.google.common.cache.CacheBuilder; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -119,6 +115,9 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.cache.Cache; +import org.apache.hadoop.hbase.shaded.com.google.common.cache.CacheBuilder; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; @@ -221,8 +220,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.zookeeper.KeeperException; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - /** * Implements the regionserver RPC services. */ @@ -3108,9 +3105,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Heartbeat messages occur when the time limit has been reached. builder.setHeartbeatMessage(timeLimitReached); if (timeLimitReached && rsh.needCursor) { - Cell readingCell = scannerContext.getPeekedCellInHeartbeat(); - if (readingCell != null ) { - builder.setCursor(ProtobufUtil.toCursor(readingCell)); + Cell cursorCell = scannerContext.getLastPeekedCell(); + if (cursorCell != null ) { + builder.setCursor(ProtobufUtil.toCursor(cursorCell)); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 2bab82e51cc..bc7b59745fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -97,7 +95,7 @@ public class ScannerContext { boolean keepProgress; private static boolean DEFAULT_KEEP_PROGRESS = false; - private Cell peekedCellInHeartbeat = null; + private Cell lastPeekedCell = null; /** * Tracks the relevant server side metrics during scans. null when metrics should not be tracked @@ -333,12 +331,12 @@ public class ScannerContext { || checkTimeLimit(checkerScope); } - public Cell getPeekedCellInHeartbeat() { - return peekedCellInHeartbeat; + Cell getLastPeekedCell() { + return lastPeekedCell; } - public void setPeekedCellInHeartbeat(Cell peekedCellInHeartbeat) { - this.peekedCellInHeartbeat = peekedCellInHeartbeat; + void setLastPeekedCell(Cell lastPeekedCell) { + this.lastPeekedCell = lastPeekedCell; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 969d4859093..5286c399830 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -48,11 +48,10 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatc import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> * for a single row. @@ -105,7 +104,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * KVs skipped via seeking to next row/column. TODO: estimate them? */ private long kvsScanned = 0; - private Cell prevCell = null; + protected Cell prevCell = null; private final long preadMaxBytes; private long bytesRead; @@ -593,7 +592,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { scannerContext.updateTimeProgress(); if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { - scannerContext.setPeekedCellInHeartbeat(prevCell); return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } } @@ -605,6 +603,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int cellSize = CellUtil.estimatedSerializedSizeOf(cell); bytesRead += cellSize; prevCell = cell; + scannerContext.setLastPeekedCell(cell); topChanged = false; ScanQueryMatcher.MatchCode qcode = matcher.match(cell); switch (qcode) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java new file mode 100644 index 00000000000..ffd8c01a6b0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestScanCursor.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public abstract class AbstractTestScanCursor { + + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * Table configuration + */ + protected static TableName TABLE_NAME = TableName.valueOf("TestScanCursor"); + + protected static int NUM_ROWS = 5; + protected static byte[] ROW = Bytes.toBytes("testRow"); + protected static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + protected static int NUM_FAMILIES = 2; + protected static byte[] FAMILY = Bytes.toBytes("testFamily"); + protected static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + protected static int NUM_QUALIFIERS = 2; + protected static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + protected static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + protected static int VALUE_SIZE = 10; + protected static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + protected static final int TIMEOUT = 4000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT); + + // Check the timeout condition after every cell + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); + TEST_UTIL.startMiniCluster(1); + + createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + private static void createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + TEST_UTIL.createTable(name, families).put(createPuts(rows, families, qualifiers, cellValue)); + } + + private static List createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + List puts = new ArrayList<>(); + for (int row = 0; row < rows.length; row++) { + Put put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + return puts; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + public static final class SparseFilter extends FilterBase { + + private final boolean reversed; + + public SparseFilter(boolean reversed) { + this.reversed = reversed; + } + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + Threads.sleep(TIMEOUT / 2 + 100); + return Bytes.equals(CellUtil.cloneRow(v), ROWS[reversed ? 0 : NUM_ROWS - 1]) + ? ReturnCode.INCLUDE + : ReturnCode.SKIP; + } + + @Override + public byte[] toByteArray() throws IOException { + return reversed ? new byte[] { 1 } : new byte[] { 0 }; + } + + public static Filter parseFrom(final byte[] pbBytes) { + return new SparseFilter(pbBytes[0] != 0); + } + } + + protected Scan createScanWithSparseFilter() { + return new Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE) + .setNeedCursorResult(true).setAllowPartialResults(true).setFilter(new SparseFilter(false)); + } + + protected Scan createReversedScanWithSparseFilter() { + return new Scan().setMaxResultSize(Long.MAX_VALUE).setCaching(Integer.MAX_VALUE) + .setReversed(true).setNeedCursorResult(true).setAllowPartialResults(true) + .setFilter(new SparseFilter(true)); + } + + protected Scan createScanWithSizeLimit() { + return new Scan().setMaxResultSize(1).setCaching(Integer.MAX_VALUE).setNeedCursorResult(true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java new file mode 100644 index 00000000000..9caf942c64b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestRawAsyncScanCursor extends AbstractTestScanCursor { + + private void doTest(boolean reversed) + throws InterruptedException, ExecutionException, IOException { + CompletableFuture future = new CompletableFuture<>(); + try (AsyncConnection conn = + ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { + RawAsyncTable table = conn.getRawTable(TABLE_NAME); + table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), + new RawScanResultConsumer() { + + private int count; + + @Override + public void onHeartbeat(ScanController controller) { + int row = count / NUM_FAMILIES / NUM_QUALIFIERS; + if (reversed) { + row = NUM_ROWS - 1 - row; + } + try { + assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; + } + } + + @Override + public void onNext(Result[] results, ScanController controller) { + try { + assertEquals(1, results.length); + assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); + // we will always provide a scan cursor if time limit is reached. + if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { + assertFalse(controller.cursor().isPresent()); + } else { + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], + controller.cursor().get().getRow()); + } + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; + } + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(null); + } + }); + future.get(); + } + } + + @Test + public void testHeartbeatWithSparseFilter() + throws IOException, InterruptedException, ExecutionException { + doTest(false); + } + + @Test + public void testHeartbeatWithSparseFilterReversed() + throws IOException, InterruptedException, ExecutionException { + doTest(true); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java new file mode 100644 index 00000000000..f7798f07b3b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestScanCursor extends AbstractTestScanCursor { + + @Test + public void testHeartbeatWithSparseFilter() throws Exception { + try (ResultScanner scanner = + TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } + + @Test + public void testHeartbeatWithSparseFilterReversed() throws Exception { + try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME) + .getScanner(createReversedScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[0], r.getRow()); + } + num++; + } + } + } + + @Test + public void testSizeLimit() throws IOException { + try (ResultScanner scanner = + TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) { + Assert.assertTrue(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + Assert.assertFalse(r.isCursor()); + Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java deleted file mode 100644 index e40b808f194..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerCursor.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTestConst; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(MediumTests.class) -public class TestScannerCursor { - - private static final Log LOG = - LogFactory.getLog(TestScannerCursor.class); - - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static Table TABLE = null; - - /** - * Table configuration - */ - private static TableName TABLE_NAME = TableName.valueOf("TestScannerCursor"); - - private static int NUM_ROWS = 5; - private static byte[] ROW = Bytes.toBytes("testRow"); - private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); - - private static int NUM_FAMILIES = 2; - private static byte[] FAMILY = Bytes.toBytes("testFamily"); - private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); - - private static int NUM_QUALIFIERS = 2; - private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); - private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); - - private static int VALUE_SIZE = 10; - private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); - - private static final int TIMEOUT = 4000; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT); - - // Check the timeout condition after every cell - conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); - TEST_UTIL.startMiniCluster(1); - - TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); - - } - - static Table createTestTable(TableName name, byte[][] rows, byte[][] families, - byte[][] qualifiers, byte[] cellValue) throws IOException { - Table ht = TEST_UTIL.createTable(name, families); - List puts = createPuts(rows, families, qualifiers, cellValue); - ht.put(puts); - return ht; - } - - static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, - byte[] value) throws IOException { - Put put; - ArrayList puts = new ArrayList<>(); - - for (int row = 0; row < rows.length; row++) { - put = new Put(rows[row]); - for (int fam = 0; fam < families.length; fam++) { - for (int qual = 0; qual < qualifiers.length; qual++) { - KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); - put.add(kv); - } - } - puts.add(put); - } - - return puts; - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - public static class SparseFilter extends FilterBase { - - @Override - public ReturnCode filterKeyValue(Cell v) throws IOException { - Threads.sleep(TIMEOUT / 2 + 100); - return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE - : ReturnCode.SKIP; - } - - public static Filter parseFrom(final byte[] pbBytes) { - return new SparseFilter(); - } - } - - @Test - public void testHeartbeatWithSparseFilter() throws Exception { - Scan scan = new Scan(); - scan.setMaxResultSize(Long.MAX_VALUE); - scan.setCaching(Integer.MAX_VALUE); - scan.setNeedCursorResult(true); - scan.setAllowPartialResults(true); - scan.setFilter(new SparseFilter()); - try(ResultScanner scanner = TABLE.getScanner(scan)) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - - if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } - - @Test - public void testSizeLimit() throws IOException { - Scan scan = new Scan(); - scan.setMaxResultSize(1); - scan.setCaching(Integer.MAX_VALUE); - scan.setNeedCursorResult(true); - try (ResultScanner scanner = TABLE.getScanner(scan)) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - - if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS)-1) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } - -}