diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java index ab2658755fb..caecfd437cd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -41,7 +41,7 @@ class AllowPartialScanResultCache implements ScanResultCache { } private void updateLastCell(Result result) { - lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null; + lastCell = result.rawCells()[result.rawCells().length - 1]; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 504a44a1363..74c20ded181 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -49,7 +49,7 @@ class AsyncClientScanner { // AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly. private final Scan scan; - private final ScanResultConsumer consumer; + private final RawScanResultConsumer consumer; private final TableName tableName; @@ -61,7 +61,7 @@ class AsyncClientScanner { private final ScanResultCache resultCache; - public AsyncClientScanner(Scan scan, ScanResultConsumer consumer, TableName tableName, + public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName, AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { if (scan.getStartRow() == null) { scan.setStartRow(EMPTY_START_ROW); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index d0bbcdb33cb..c40de31e866 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -182,7 +182,7 @@ class AsyncRpcRetryingCallerFactory { private ScanResultCache resultCache; - private ScanResultConsumer consumer; + private RawScanResultConsumer consumer; private ClientService.Interface stub; @@ -207,7 +207,7 @@ class AsyncRpcRetryingCallerFactory { return this; } - public ScanSingleRegionCallerBuilder consumer(ScanResultConsumer consumer) { + public ScanSingleRegionCallerBuilder consumer(RawScanResultConsumer consumer) { this.consumer = consumer; return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 0efac7fc159..ca83a515311 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -75,7 +75,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final ScanResultCache resultCache; - private final ScanResultConsumer consumer; + private final RawScanResultConsumer consumer; private final ClientService.Interface stub; @@ -113,7 +113,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache, - ScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs, + RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs, int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; @@ -246,7 +246,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void updateNextStartRowWhenError(Result result) { nextStartRowWhenError = result.getRow(); - includeNextStartRowWhenError = result.isPartial(); + includeNextStartRowWhenError = scan.getBatch() > 0 || result.isPartial(); } private void completeWhenNoMoreResultsInRegion() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index e082d10a0c1..893beb93af7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -30,4 +30,41 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Unstable public interface AsyncTable extends AsyncTableBase { + + /** + * Gets a scanner on the current table for the given family. + * @param family The column family to scan. + * @return A scanner. + */ + default ResultScanner getScanner(byte[] family) { + return getScanner(new Scan().addFamily(family)); + } + + /** + * Gets a scanner on the current table for the given family and qualifier. + * @param family The column family to scan. + * @param qualifier The column qualifier to scan. + * @return A scanner. + */ + default ResultScanner getScanner(byte[] family, byte[] qualifier) { + return getScanner(new Scan().addColumn(family, qualifier)); + } + + /** + * Returns a scanner on the current table as specified by the {@link Scan} object. + * @param scan A configured {@link Scan} object. + * @return A scanner. + */ + ResultScanner getScanner(Scan scan); + + /** + * The scan API uses the observer pattern. All results that match the given scan object will be + * passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result)}. + * {@link ScanResultConsumer#onComplete()} means the scan is finished, and + * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan + * is terminated. + * @param scan A configured {@link Scan} object. + * @param consumer the consumer used to receive results. + */ + void scan(Scan scan, ScanResultConsumer consumer); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 6e1233ddccd..cecf8150269 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -26,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The implementation of AsyncTable. Based on {@link RawAsyncTable}. @@ -37,9 +39,12 @@ class AsyncTableImpl implements AsyncTable { private final ExecutorService pool; + private final long defaultScannerMaxResultSize; + public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) { this.rawTable = conn.getRawTable(tableName); this.pool = pool; + this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @Override @@ -156,4 +161,34 @@ class AsyncTableImpl implements AsyncTable { public CompletableFuture> smallScan(Scan scan, int limit) { return wrap(rawTable.smallScan(scan, limit)); } + + private long resultSize2CacheSize(long maxResultSize) { + // * 2 if possible + return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; + } + + @Override + public ResultScanner getScanner(Scan scan) { + return new AsyncTableResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan), + resultSize2CacheSize( + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + } + + private void scan0(Scan scan, ScanResultConsumer consumer) { + try (ResultScanner scanner = getScanner(scan)) { + for (Result result; (result = scanner.next()) != null;) { + if (!consumer.onNext(result)) { + break; + } + } + consumer.onComplete(); + } catch (IOException e) { + consumer.onError(e); + } + } + + @Override + public void scan(Scan scan, ScanResultConsumer consumer) { + pool.execute(() -> scan0(scan, consumer)); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java new file mode 100644 index 00000000000..cb8652e3256 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; +import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.function.Function; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically + * in background and cache it in memory. Typically the {@link #maxCacheSize} will be + * {@code 2 * scan.getMaxResultSize()}. + */ +@InterfaceAudience.Private +class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { + + private static final Log LOG = LogFactory.getLog(AsyncTableResultScanner.class); + + private final RawAsyncTable rawTable; + + private final Scan scan; + + private final long maxCacheSize; + + private final Queue queue = new ArrayDeque<>(); + + private long cacheSize; + + private boolean closed = false; + + private Throwable error; + + private boolean prefetchStopped; + + private int numberOfOnCompleteToIgnore; + + // used to filter out cells that already returned when we restart a scan + private Cell lastCell; + + private Function createClosestRow; + + public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { + this.rawTable = table; + this.scan = scan; + this.maxCacheSize = maxCacheSize; + this.createClosestRow = scan.isReversed() ? ConnectionUtils::createClosestRowBefore + : ConnectionUtils::createClosestRowAfter; + table.scan(scan, this); + } + + private void addToCache(Result result) { + queue.add(result); + cacheSize += calcEstimatedSize(result); + } + + private void stopPrefetch(Result lastResult) { + prefetchStopped = true; + if (lastResult.isPartial() || scan.getBatch() > 0) { + scan.setStartRow(lastResult.getRow()); + lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1]; + } else { + scan.setStartRow(createClosestRow.apply(lastResult.getRow())); + } + if (LOG.isDebugEnabled()) { + LOG.debug(System.identityHashCode(this) + " stop prefetching when scanning " + + rawTable.getName() + " as the cache size " + cacheSize + + " is greater than the maxCacheSize + " + maxCacheSize + ", the next start row is " + + Bytes.toStringBinary(scan.getStartRow()) + ", lastCell is " + lastCell); + } + // Ignore an onComplete call as the scan is stopped by us. + // Here we can not use a simple boolean flag. A scan operation can cross multiple regions and + // the regions may be located on different regionservers, so it is possible that the methods of + // RawScanResultConsumer are called in different rpc framework threads and overlapped with each + // other. It may happen that + // 1. we stop scan1 + // 2. we start scan2 + // 3. we stop scan2 + // 4. onComplete for scan1 is called + // 5. onComplete for scan2 is called + // So if we use a boolean flag here then we can only ignore the onComplete in step4 and think + // that the onComplete in step 5 tells us there is no data. + numberOfOnCompleteToIgnore++; + } + + @Override + public synchronized boolean onNext(Result[] results) { + assert results.length > 0; + if (closed) { + return false; + } + Result firstResult = results[0]; + if (lastCell != null) { + firstResult = filterCells(firstResult, lastCell); + if (firstResult != null) { + // do not set lastCell to null if the result after filtering is null as there may still be + // other cells that can be filtered out + lastCell = null; + addToCache(firstResult); + } else if (results.length == 1) { + // the only one result is null + return true; + } + } else { + addToCache(firstResult); + } + for (int i = 1; i < results.length; i++) { + addToCache(results[i]); + } + notifyAll(); + if (cacheSize < maxCacheSize) { + return true; + } + stopPrefetch(results[results.length - 1]); + return false; + } + + @Override + public synchronized boolean onHeartbeat() { + return !closed; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + } + + @Override + public synchronized void onComplete() { + // Do not mark the scanner as closed if the scan is stopped by us due to cache size limit since + // we may resume later by starting a new scan. See resumePrefetch. + if (numberOfOnCompleteToIgnore > 0) { + numberOfOnCompleteToIgnore--; + return; + } + closed = true; + notifyAll(); + } + + private void resumePrefetch() { + if (LOG.isDebugEnabled()) { + LOG.debug(System.identityHashCode(this) + " resume prefetching"); + } + prefetchStopped = false; + rawTable.scan(scan, this); + } + + @Override + public synchronized Result next() throws IOException { + while (queue.isEmpty()) { + if (closed) { + return null; + } + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } + try { + wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + Result result = queue.poll(); + cacheSize -= calcEstimatedSize(result); + if (prefetchStopped && cacheSize <= maxCacheSize / 2) { + resumePrefetch(); + } + return result; + } + + @Override + public synchronized void close() { + closed = true; + queue.clear(); + cacheSize = 0; + notifyAll(); + } + + @Override + public boolean renewLease() { + // we will do prefetching in the background and if there is no space we will just terminate the + // background scan operation. So there is no reason to renew lease here. + return false; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java index 538aecb04bf..9dfb8f73143 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java @@ -40,9 +40,22 @@ class CompleteScanResultCache implements ScanResultCache { } private Result[] prependCombined(Result[] results, int length) throws IOException { + if (length == 0) { + return new Result[] { combine() }; + } + // the last part of a partial result may not be marked as partial so here we need to check if + // there is a row change. + int start; + if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) { + partialResults.add(results[0]); + start = 1; + length--; + } else { + start = 0; + } Result[] prependResults = new Result[length + 1]; prependResults[0] = combine(); - System.arraycopy(results, 0, prependResults, 1, length); + System.arraycopy(results, start, prependResults, 1, length); return prependResults; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index 14184b0b66b..823367af1ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -29,10 +29,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; * NIO). *

* So, only experts that want to build high performance service should use this interface directly, - * especially for the {@link #scan(Scan, ScanResultConsumer)} below. + * especially for the {@link #scan(Scan, RawScanResultConsumer)} below. *

* TODO: For now the only difference between this interface and {@link AsyncTable} is the scan - * method. The {@link ScanResultConsumer} exposes the implementation details of a scan(heartbeat) so + * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) so * it is not suitable for a normal user. If it is still the only difference after we implement most * features of AsyncTable, we can think about merge these two interfaces. */ @@ -42,11 +42,11 @@ public interface RawAsyncTable extends AsyncTableBase { /** * The basic scan API uses the observer pattern. All results that match the given scan object will - * be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}. - * {@link ScanResultConsumer#onComplete()} means the scan is finished, and - * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan - * is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we - * can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually + * be passed to the given {@code consumer} by calling {@link RawScanResultConsumer#onNext(Result[])}. + * {@link RawScanResultConsumer#onComplete()} means the scan is finished, and + * {@link RawScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan + * is terminated. {@link RawScanResultConsumer#onHeartbeat()} means the RS is still working but we + * can not get a valid result to call {@link RawScanResultConsumer#onNext(Result[])}. This is usually * because the matched results are too sparse, for example, a filter which almost filters out * everything is specified. *

@@ -57,5 +57,5 @@ public interface RawAsyncTable extends AsyncTableBase { * @param scan A configured {@link Scan} object. * @param consumer the consumer used to receive results. */ - void scan(Scan scan, ScanResultConsumer consumer); + void scan(Scan scan, RawScanResultConsumer consumer); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index bf9a3f26fbd..cdc90ab6869 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -352,7 +352,7 @@ class RawAsyncTableImpl implements RawAsyncTable { .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); } - public void scan(Scan scan, ScanResultConsumer consumer) { + public void scan(Scan scan, RawScanResultConsumer consumer) { if (scan.isSmall()) { if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { consumer.onError( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java new file mode 100644 index 00000000000..7f0514c4c26 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Result; + +/** + * Receives {@link Result} for an asynchronous scan. + *

+ * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send + * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in + * background while you process the returned data, you need to move the processing work to another + * thread to make the {@code onNext} call return immediately. And please do NOT do any time + * consuming tasks in all methods below unless you know what you are doing. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface RawScanResultConsumer { + + /** + * @param results the data fetched from HBase service. + * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} + */ + boolean onNext(Result[] results); + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + *

+ * This method give you a chance to terminate a slow scan operation. + * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} + */ + boolean onHeartbeat(); + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + *

+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + void onError(Throwable error); + + /** + * Indicate that the scan operation is completed normally. + */ + void onComplete(); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java index 772a2fb6484..770a87fb407 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java @@ -19,35 +19,19 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Result; /** - * Receives {@link Result} from an asynchronous scanner. - *

- * Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send - * request to HBase service. So if you want the asynchronous scanner fetch data from HBase in - * background while you process the returned data, you need to move the processing work to another - * thread to make the {@code onNext} call return immediately. And please do NOT do any time - * consuming tasks in all methods below unless you know what you are doing. + * Receives {@link Result} for an asynchronous scan. */ @InterfaceAudience.Public @InterfaceStability.Unstable public interface ScanResultConsumer { /** - * @param results the data fetched from HBase service. - * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} + * @param result the data fetched from HBase service. + * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} */ - boolean onNext(Result[] results); - - /** - * Indicate that there is an heartbeat message but we have not cumulated enough cells to call - * onNext. - *

- * This method give you a chance to terminate a slow scan operation. - * @return {@code false} if you want to stop the scanner process. Otherwise {@code true} - */ - boolean onHeartbeat(); + boolean onNext(Result result); /** * Indicate that we hit an unrecoverable error and the scan operation is terminated. @@ -60,4 +44,5 @@ public interface ScanResultConsumer { * Indicate that the scan operation is completed normally. */ void onComplete(); -} \ No newline at end of file + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 220be104d43..30281118306 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -18,10 +18,15 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -59,10 +64,9 @@ public abstract class AbstractTestAsyncTableScan { ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); List> futures = new ArrayList<>(); - IntStream.range(0, COUNT) - .forEach(i -> futures.add(table.put( - new Put(Bytes.toBytes(String.format("%03d", i))).addColumn(FAMILY, CQ1, Bytes.toBytes(i)) - .addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); + IntStream.range(0, COUNT).forEach( + i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); } @@ -76,6 +80,22 @@ public abstract class AbstractTestAsyncTableScan { protected abstract List doScan(Scan scan) throws Exception; + private Result convertToPartial(Result result) { + return Result.create(result.rawCells(), result.getExists(), result.isStale(), true); + } + + protected final List convertFromBatchResult(List results) { + assertTrue(results.size() % 2 == 0); + return IntStream.range(0, results.size() / 2).mapToObj(i -> { + try { + return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)), + convertToPartial(results.get(2 * i + 1)))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + } + @Test public void testScanAll() throws Exception { List results = doScan(createScan()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index 65fb1ad4224..f151e832f4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,23 +17,16 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertTrue; - import com.google.common.base.Throwables; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Queue; +import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -41,35 +34,27 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -@Category({ MediumTests.class, ClientTests.class }) +@Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScan extends AbstractTestAsyncTableScan { private static final class SimpleScanResultConsumer implements ScanResultConsumer { - private final Queue queue = new ArrayDeque<>(); - - private boolean finished; + private final List results = new ArrayList<>(); private Throwable error; - @Override - public synchronized boolean onNext(Result[] results) { - for (Result result : results) { - queue.offer(result); - } - notifyAll(); - return true; - } + private boolean finished = false; @Override - public boolean onHeartbeat() { + public synchronized boolean onNext(Result result) { + results.add(result); return true; } @Override public synchronized void onError(Throwable error) { - finished = true; this.error = error; + finished = true; notifyAll(); } @@ -79,21 +64,15 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { notifyAll(); } - public synchronized Result take() throws IOException, InterruptedException { - for (;;) { - if (!queue.isEmpty()) { - return queue.poll(); - } - if (finished) { - if (error != null) { - Throwables.propagateIfPossible(error, IOException.class); - throw new IOException(error); - } else { - return null; - } - } + public synchronized List getAll() throws Exception { + while (!finished) { wait(); } + if (error != null) { + Throwables.propagateIfPossible(error, Exception.class); + throw new Exception(error); + } + return results; } } @@ -103,7 +82,9 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { @Parameters public static List params() { return Arrays.asList(new Supplier[] { TestAsyncTableScan::createNormalScan }, - new Supplier[] { TestAsyncTableScan::createBatchScan }); + new Supplier[] { TestAsyncTableScan::createBatchScan }, + new Supplier[] { TestAsyncTableScan::createSmallResultSizeScan }, + new Supplier[] { TestAsyncTableScan::createBatchSmallResultSizeScan }); } private static Scan createNormalScan() { @@ -114,34 +95,30 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { return new Scan().setBatch(1); } + // set a small result size for testing flow control + private static Scan createSmallResultSizeScan() { + return new Scan().setMaxResultSize(1); + } + + private static Scan createBatchSmallResultSizeScan() { + return new Scan().setBatch(1).setMaxResultSize(1); + } + @Override protected Scan createScan() { return scanCreater.get(); } - private Result convertToPartial(Result result) { - return Result.create(result.rawCells(), result.getExists(), result.isStale(), true); - } - @Override protected List doScan(Scan scan) throws Exception { - SimpleScanResultConsumer scanConsumer = new SimpleScanResultConsumer(); - ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer); - List results = new ArrayList<>(); - for (Result result; (result = scanConsumer.take()) != null;) { - results.add(result); - } + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + table.scan(scan, consumer); + List results = consumer.getAll(); if (scan.getBatch() > 0) { - assertTrue(results.size() % 2 == 0); - return IntStream.range(0, results.size() / 2).mapToObj(i -> { - try { - return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)), - convertToPartial(results.get(2 * i + 1)))); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }).collect(Collectors.toList()); + results = convertFromBatchResult(results); } return results; } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java new file mode 100644 index 00000000000..006770d9156 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { + + @Parameter + public Supplier scanCreater; + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTableScanner::createNormalScan }, + new Supplier[] { TestAsyncTableScanner::createBatchScan }, + new Supplier[] { TestAsyncTableScanner::createSmallResultSizeScan }, + new Supplier[] { TestAsyncTableScanner::createBatchSmallResultSizeScan }); + } + + private static Scan createNormalScan() { + return new Scan(); + } + + private static Scan createBatchScan() { + return new Scan().setBatch(1); + } + + // set a small result size for testing flow control + private static Scan createSmallResultSizeScan() { + return new Scan().setMaxResultSize(1); + } + + private static Scan createBatchSmallResultSizeScan() { + return new Scan().setBatch(1).setMaxResultSize(1); + } + + @Override + protected Scan createScan() { + return scanCreater.get(); + } + + @Override + protected List doScan(Scan scan) throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + List results = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result result; (result = scanner.next()) != null;) { + results.add(result); + } + } + if (scan.getBatch() > 0) { + results = convertFromBatchResult(results); + } + return results; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java index d688d0acdde..a340e9f901c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java @@ -139,8 +139,8 @@ public class TestCompleteResultScanResultCache { assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length); - Result[] results = resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, - false); + Result[] results = + resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false); assertEquals(2, results.length); assertEquals(1, Bytes.toInt(results[0].getRow())); assertEquals(2, results[0].rawCells().length); @@ -156,4 +156,28 @@ public class TestCompleteResultScanResultCache { assertEquals(1, results[0].rawCells().length); assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1))); } + + @Test + public void testCombine4() throws IOException { + Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true); + Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false); + Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true); + Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false); + + assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length); + + Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false); + assertEquals(1, results.length); + assertEquals(1, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2))); + + results = resultCache.addAndGet(new Result[] { nextResult2 }, false); + assertEquals(1, results.length); + assertEquals(2, Bytes.toInt(results[0].getRow())); + assertEquals(2, results[0].rawCells().length); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1))); + assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ2))); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java new file mode 100644 index 00000000000..1267d5f0e64 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import java.util.function.Supplier; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { + + private static final class SimpleRawScanResultConsumer implements RawScanResultConsumer { + + private final Queue queue = new ArrayDeque<>(); + + private boolean finished; + + private Throwable error; + + @Override + public synchronized boolean onNext(Result[] results) { + for (Result result : results) { + queue.offer(result); + } + notifyAll(); + return true; + } + + @Override + public boolean onHeartbeat() { + return true; + } + + @Override + public synchronized void onError(Throwable error) { + finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized Result take() throws IOException, InterruptedException { + for (;;) { + if (!queue.isEmpty()) { + return queue.poll(); + } + if (finished) { + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } else { + return null; + } + } + wait(); + } + } + } + + @Parameter + public Supplier scanCreater; + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestRawAsyncTableScan::createNormalScan }, + new Supplier[] { TestRawAsyncTableScan::createBatchScan }); + } + + private static Scan createNormalScan() { + return new Scan(); + } + + private static Scan createBatchScan() { + return new Scan().setBatch(1); + } + + @Override + protected Scan createScan() { + return scanCreater.get(); + } + + @Override + protected List doScan(Scan scan) throws Exception { + SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer(); + ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer); + List results = new ArrayList<>(); + for (Result result; (result = scanConsumer.take()) != null;) { + results.add(result); + } + if (scan.getBatch() > 0) { + results = convertFromBatchResult(results); + } + return results; + } +}