HBASE-16984 Implement getScanner

This commit is contained in:
zhangduo 2016-11-25 13:15:04 +08:00
parent 86e17858f7
commit 01d3473bf0
17 changed files with 684 additions and 101 deletions

View File

@ -41,7 +41,7 @@ class AllowPartialScanResultCache implements ScanResultCache {
}
private void updateLastCell(Result result) {
lastCell = result.isPartial() ? result.rawCells()[result.rawCells().length - 1] : null;
lastCell = result.rawCells()[result.rawCells().length - 1];
}
@Override

View File

@ -49,7 +49,7 @@ class AsyncClientScanner {
// AsyncScanSingleRegionRpcRetryingCaller will modify this scan object directly.
private final Scan scan;
private final ScanResultConsumer consumer;
private final RawScanResultConsumer consumer;
private final TableName tableName;
@ -61,7 +61,7 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;
public AsyncClientScanner(Scan scan, ScanResultConsumer consumer, TableName tableName,
public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) {
if (scan.getStartRow() == null) {
scan.setStartRow(EMPTY_START_ROW);

View File

@ -182,7 +182,7 @@ class AsyncRpcRetryingCallerFactory {
private ScanResultCache resultCache;
private ScanResultConsumer consumer;
private RawScanResultConsumer consumer;
private ClientService.Interface stub;
@ -207,7 +207,7 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public ScanSingleRegionCallerBuilder consumer(ScanResultConsumer consumer) {
public ScanSingleRegionCallerBuilder consumer(RawScanResultConsumer consumer) {
this.consumer = consumer;
return this;
}

View File

@ -75,7 +75,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final ScanResultCache resultCache;
private final ScanResultConsumer consumer;
private final RawScanResultConsumer consumer;
private final ClientService.Interface stub;
@ -113,7 +113,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
ScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
@ -246,7 +246,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void updateNextStartRowWhenError(Result result) {
nextStartRowWhenError = result.getRow();
includeNextStartRowWhenError = result.isPartial();
includeNextStartRowWhenError = scan.getBatch() > 0 || result.isPartial();
}
private void completeWhenNoMoreResultsInRegion() {

View File

@ -30,4 +30,41 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface AsyncTable extends AsyncTableBase {
/**
* Gets a scanner on the current table for the given family.
* @param family The column family to scan.
* @return A scanner.
*/
default ResultScanner getScanner(byte[] family) {
return getScanner(new Scan().addFamily(family));
}
/**
* Gets a scanner on the current table for the given family and qualifier.
* @param family The column family to scan.
* @param qualifier The column qualifier to scan.
* @return A scanner.
*/
default ResultScanner getScanner(byte[] family, byte[] qualifier) {
return getScanner(new Scan().addColumn(family, qualifier));
}
/**
* Returns a scanner on the current table as specified by the {@link Scan} object.
* @param scan A configured {@link Scan} object.
* @return A scanner.
*/
ResultScanner getScanner(Scan scan);
/**
* The scan API uses the observer pattern. All results that match the given scan object will be
* passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result)}.
* {@link ScanResultConsumer#onComplete()} means the scan is finished, and
* {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
* is terminated.
* @param scan A configured {@link Scan} object.
* @param consumer the consumer used to receive results.
*/
void scan(Scan scan, ScanResultConsumer consumer);
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@ -26,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* The implementation of AsyncTable. Based on {@link RawAsyncTable}.
@ -37,9 +39,12 @@ class AsyncTableImpl implements AsyncTable {
private final ExecutorService pool;
private final long defaultScannerMaxResultSize;
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) {
this.rawTable = conn.getRawTable(tableName);
this.pool = pool;
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
}
@Override
@ -156,4 +161,34 @@ class AsyncTableImpl implements AsyncTable {
public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
return wrap(rawTable.smallScan(scan, limit));
}
private long resultSize2CacheSize(long maxResultSize) {
// * 2 if possible
return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
}
@Override
public ResultScanner getScanner(Scan scan) {
return new AsyncTableResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan),
resultSize2CacheSize(
scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
}
private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
}
}
consumer.onComplete();
} catch (IOException e) {
consumer.onError(e);
}
}
@Override
public void scan(Scan scan, ScanResultConsumer consumer) {
pool.execute(() -> scan0(scan, consumer));
}
}

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
/**
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
* in background and cache it in memory. Typically the {@link #maxCacheSize} will be
* {@code 2 * scan.getMaxResultSize()}.
*/
@InterfaceAudience.Private
class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private static final Log LOG = LogFactory.getLog(AsyncTableResultScanner.class);
private final RawAsyncTable rawTable;
private final Scan scan;
private final long maxCacheSize;
private final Queue<Result> queue = new ArrayDeque<>();
private long cacheSize;
private boolean closed = false;
private Throwable error;
private boolean prefetchStopped;
private int numberOfOnCompleteToIgnore;
// used to filter out cells that already returned when we restart a scan
private Cell lastCell;
private Function<byte[], byte[]> createClosestRow;
public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
this.rawTable = table;
this.scan = scan;
this.maxCacheSize = maxCacheSize;
this.createClosestRow = scan.isReversed() ? ConnectionUtils::createClosestRowBefore
: ConnectionUtils::createClosestRowAfter;
table.scan(scan, this);
}
private void addToCache(Result result) {
queue.add(result);
cacheSize += calcEstimatedSize(result);
}
private void stopPrefetch(Result lastResult) {
prefetchStopped = true;
if (lastResult.isPartial() || scan.getBatch() > 0) {
scan.setStartRow(lastResult.getRow());
lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1];
} else {
scan.setStartRow(createClosestRow.apply(lastResult.getRow()));
}
if (LOG.isDebugEnabled()) {
LOG.debug(System.identityHashCode(this) + " stop prefetching when scanning "
+ rawTable.getName() + " as the cache size " + cacheSize
+ " is greater than the maxCacheSize + " + maxCacheSize + ", the next start row is "
+ Bytes.toStringBinary(scan.getStartRow()) + ", lastCell is " + lastCell);
}
// Ignore an onComplete call as the scan is stopped by us.
// Here we can not use a simple boolean flag. A scan operation can cross multiple regions and
// the regions may be located on different regionservers, so it is possible that the methods of
// RawScanResultConsumer are called in different rpc framework threads and overlapped with each
// other. It may happen that
// 1. we stop scan1
// 2. we start scan2
// 3. we stop scan2
// 4. onComplete for scan1 is called
// 5. onComplete for scan2 is called
// So if we use a boolean flag here then we can only ignore the onComplete in step4 and think
// that the onComplete in step 5 tells us there is no data.
numberOfOnCompleteToIgnore++;
}
@Override
public synchronized boolean onNext(Result[] results) {
assert results.length > 0;
if (closed) {
return false;
}
Result firstResult = results[0];
if (lastCell != null) {
firstResult = filterCells(firstResult, lastCell);
if (firstResult != null) {
// do not set lastCell to null if the result after filtering is null as there may still be
// other cells that can be filtered out
lastCell = null;
addToCache(firstResult);
} else if (results.length == 1) {
// the only one result is null
return true;
}
} else {
addToCache(firstResult);
}
for (int i = 1; i < results.length; i++) {
addToCache(results[i]);
}
notifyAll();
if (cacheSize < maxCacheSize) {
return true;
}
stopPrefetch(results[results.length - 1]);
return false;
}
@Override
public synchronized boolean onHeartbeat() {
return !closed;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
}
@Override
public synchronized void onComplete() {
// Do not mark the scanner as closed if the scan is stopped by us due to cache size limit since
// we may resume later by starting a new scan. See resumePrefetch.
if (numberOfOnCompleteToIgnore > 0) {
numberOfOnCompleteToIgnore--;
return;
}
closed = true;
notifyAll();
}
private void resumePrefetch() {
if (LOG.isDebugEnabled()) {
LOG.debug(System.identityHashCode(this) + " resume prefetching");
}
prefetchStopped = false;
rawTable.scan(scan, this);
}
@Override
public synchronized Result next() throws IOException {
while (queue.isEmpty()) {
if (closed) {
return null;
}
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
}
try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
Result result = queue.poll();
cacheSize -= calcEstimatedSize(result);
if (prefetchStopped && cacheSize <= maxCacheSize / 2) {
resumePrefetch();
}
return result;
}
@Override
public synchronized void close() {
closed = true;
queue.clear();
cacheSize = 0;
notifyAll();
}
@Override
public boolean renewLease() {
// we will do prefetching in the background and if there is no space we will just terminate the
// background scan operation. So there is no reason to renew lease here.
return false;
}
}

View File

@ -40,9 +40,22 @@ class CompleteScanResultCache implements ScanResultCache {
}
private Result[] prependCombined(Result[] results, int length) throws IOException {
if (length == 0) {
return new Result[] { combine() };
}
// the last part of a partial result may not be marked as partial so here we need to check if
// there is a row change.
int start;
if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) {
partialResults.add(results[0]);
start = 1;
length--;
} else {
start = 0;
}
Result[] prependResults = new Result[length + 1];
prependResults[0] = combine();
System.arraycopy(results, 0, prependResults, 1, length);
System.arraycopy(results, start, prependResults, 1, length);
return prependResults;
}

View File

@ -29,10 +29,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
* NIO).
* <p>
* So, only experts that want to build high performance service should use this interface directly,
* especially for the {@link #scan(Scan, ScanResultConsumer)} below.
* especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
* <p>
* TODO: For now the only difference between this interface and {@link AsyncTable} is the scan
* method. The {@link ScanResultConsumer} exposes the implementation details of a scan(heartbeat) so
* method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) so
* it is not suitable for a normal user. If it is still the only difference after we implement most
* features of AsyncTable, we can think about merge these two interfaces.
*/
@ -42,11 +42,11 @@ public interface RawAsyncTable extends AsyncTableBase {
/**
* The basic scan API uses the observer pattern. All results that match the given scan object will
* be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}.
* {@link ScanResultConsumer#onComplete()} means the scan is finished, and
* {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
* is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we
* can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually
* be passed to the given {@code consumer} by calling {@link RawScanResultConsumer#onNext(Result[])}.
* {@link RawScanResultConsumer#onComplete()} means the scan is finished, and
* {@link RawScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
* is terminated. {@link RawScanResultConsumer#onHeartbeat()} means the RS is still working but we
* can not get a valid result to call {@link RawScanResultConsumer#onNext(Result[])}. This is usually
* because the matched results are too sparse, for example, a filter which almost filters out
* everything is specified.
* <p>
@ -57,5 +57,5 @@ public interface RawAsyncTable extends AsyncTableBase {
* @param scan A configured {@link Scan} object.
* @param consumer the consumer used to receive results.
*/
void scan(Scan scan, ScanResultConsumer consumer);
void scan(Scan scan, RawScanResultConsumer consumer);
}

View File

@ -352,7 +352,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
}
public void scan(Scan scan, ScanResultConsumer consumer) {
public void scan(Scan scan, RawScanResultConsumer consumer) {
if (scan.isSmall()) {
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
consumer.onError(

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
/**
* Receives {@link Result} for an asynchronous scan.
* <p>
* Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send
* request to HBase service. So if you want the asynchronous scanner fetch data from HBase in
* background while you process the returned data, you need to move the processing work to another
* thread to make the {@code onNext} call return immediately. And please do NOT do any time
* consuming tasks in all methods below unless you know what you are doing.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface RawScanResultConsumer {
/**
* @param results the data fetched from HBase service.
* @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
*/
boolean onNext(Result[] results);
/**
* Indicate that there is an heartbeat message but we have not cumulated enough cells to call
* onNext.
* <p>
* This method give you a chance to terminate a slow scan operation.
* @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
*/
boolean onHeartbeat();
/**
* Indicate that we hit an unrecoverable error and the scan operation is terminated.
* <p>
* We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
*/
void onError(Throwable error);
/**
* Indicate that the scan operation is completed normally.
*/
void onComplete();
}

View File

@ -19,35 +19,19 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Result;
/**
* Receives {@link Result} from an asynchronous scanner.
* <p>
* Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send
* request to HBase service. So if you want the asynchronous scanner fetch data from HBase in
* background while you process the returned data, you need to move the processing work to another
* thread to make the {@code onNext} call return immediately. And please do NOT do any time
* consuming tasks in all methods below unless you know what you are doing.
* Receives {@link Result} for an asynchronous scan.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface ScanResultConsumer {
/**
* @param results the data fetched from HBase service.
* @return {@code false} if you want to stop the scanner process. Otherwise {@code true}
* @param result the data fetched from HBase service.
* @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
*/
boolean onNext(Result[] results);
/**
* Indicate that there is an heartbeat message but we have not cumulated enough cells to call
* onNext.
* <p>
* This method give you a chance to terminate a slow scan operation.
* @return {@code false} if you want to stop the scanner process. Otherwise {@code true}
*/
boolean onHeartbeat();
boolean onNext(Result result);
/**
* Indicate that we hit an unrecoverable error and the scan operation is terminated.
@ -60,4 +44,5 @@ public interface ScanResultConsumer {
* Indicate that the scan operation is completed normally.
*/
void onComplete();
}
}

View File

@ -18,10 +18,15 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -59,10 +64,9 @@ public abstract class AbstractTestAsyncTableScan {
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
List<CompletableFuture<?>> futures = new ArrayList<>();
IntStream.range(0, COUNT)
.forEach(i -> futures.add(table.put(
new Put(Bytes.toBytes(String.format("%03d", i))).addColumn(FAMILY, CQ1, Bytes.toBytes(i))
.addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
IntStream.range(0, COUNT).forEach(
i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))));
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
}
@ -76,6 +80,22 @@ public abstract class AbstractTestAsyncTableScan {
protected abstract List<Result> doScan(Scan scan) throws Exception;
private Result convertToPartial(Result result) {
return Result.create(result.rawCells(), result.getExists(), result.isStale(), true);
}
protected final List<Result> convertFromBatchResult(List<Result> results) {
assertTrue(results.size() % 2 == 0);
return IntStream.range(0, results.size() / 2).mapToObj(i -> {
try {
return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)),
convertToPartial(results.get(2 * i + 1))));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).collect(Collectors.toList());
}
@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan());

View File

@ -17,23 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertTrue;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -41,35 +34,27 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
private static final class SimpleScanResultConsumer implements ScanResultConsumer {
private final Queue<Result> queue = new ArrayDeque<>();
private boolean finished;
private final List<Result> results = new ArrayList<>();
private Throwable error;
@Override
public synchronized boolean onNext(Result[] results) {
for (Result result : results) {
queue.offer(result);
}
notifyAll();
return true;
}
private boolean finished = false;
@Override
public boolean onHeartbeat() {
public synchronized boolean onNext(Result result) {
results.add(result);
return true;
}
@Override
public synchronized void onError(Throwable error) {
finished = true;
this.error = error;
finished = true;
notifyAll();
}
@ -79,21 +64,15 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
notifyAll();
}
public synchronized Result take() throws IOException, InterruptedException {
for (;;) {
if (!queue.isEmpty()) {
return queue.poll();
}
if (finished) {
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
} else {
return null;
}
}
public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
}
@ -103,7 +82,9 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestAsyncTableScan::createNormalScan },
new Supplier<?>[] { TestAsyncTableScan::createBatchScan });
new Supplier<?>[] { TestAsyncTableScan::createBatchScan },
new Supplier<?>[] { TestAsyncTableScan::createSmallResultSizeScan },
new Supplier<?>[] { TestAsyncTableScan::createBatchSmallResultSizeScan });
}
private static Scan createNormalScan() {
@ -114,34 +95,30 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
return new Scan().setBatch(1);
}
// set a small result size for testing flow control
private static Scan createSmallResultSizeScan() {
return new Scan().setMaxResultSize(1);
}
private static Scan createBatchSmallResultSizeScan() {
return new Scan().setBatch(1).setMaxResultSize(1);
}
@Override
protected Scan createScan() {
return scanCreater.get();
}
private Result convertToPartial(Result result) {
return Result.create(result.rawCells(), result.getExists(), result.isStale(), true);
}
@Override
protected List<Result> doScan(Scan scan) throws Exception {
SimpleScanResultConsumer scanConsumer = new SimpleScanResultConsumer();
ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer);
List<Result> results = new ArrayList<>();
for (Result result; (result = scanConsumer.take()) != null;) {
results.add(result);
}
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
table.scan(scan, consumer);
List<Result> results = consumer.getAll();
if (scan.getBatch() > 0) {
assertTrue(results.size() % 2 == 0);
return IntStream.range(0, results.size() / 2).mapToObj(i -> {
try {
return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)),
convertToPartial(results.get(2 * i + 1))));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).collect(Collectors.toList());
results = convertFromBatchResult(results);
}
return results;
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
@Parameter
public Supplier<Scan> scanCreater;
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestAsyncTableScanner::createNormalScan },
new Supplier<?>[] { TestAsyncTableScanner::createBatchScan },
new Supplier<?>[] { TestAsyncTableScanner::createSmallResultSizeScan },
new Supplier<?>[] { TestAsyncTableScanner::createBatchSmallResultSizeScan });
}
private static Scan createNormalScan() {
return new Scan();
}
private static Scan createBatchScan() {
return new Scan().setBatch(1);
}
// set a small result size for testing flow control
private static Scan createSmallResultSizeScan() {
return new Scan().setMaxResultSize(1);
}
private static Scan createBatchSmallResultSizeScan() {
return new Scan().setBatch(1).setMaxResultSize(1);
}
@Override
protected Scan createScan() {
return scanCreater.get();
}
@Override
protected List<Result> doScan(Scan scan) throws Exception {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result; (result = scanner.next()) != null;) {
results.add(result);
}
}
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);
}
return results;
}
}

View File

@ -139,8 +139,8 @@ public class TestCompleteResultScanResultCache {
assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
Result[] results = resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 },
false);
Result[] results =
resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false);
assertEquals(2, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
@ -156,4 +156,28 @@ public class TestCompleteResultScanResultCache {
assertEquals(1, results[0].rawCells().length);
assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1)));
}
@Test
public void testCombine4() throws IOException {
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false);
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false);
assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false);
assertEquals(1, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
results = resultCache.addAndGet(new Result[] { nextResult2 }, false);
assertEquals(1, results.length);
assertEquals(2, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ2)));
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
private static final class SimpleRawScanResultConsumer implements RawScanResultConsumer {
private final Queue<Result> queue = new ArrayDeque<>();
private boolean finished;
private Throwable error;
@Override
public synchronized boolean onNext(Result[] results) {
for (Result result : results) {
queue.offer(result);
}
notifyAll();
return true;
}
@Override
public boolean onHeartbeat() {
return true;
}
@Override
public synchronized void onError(Throwable error) {
finished = true;
this.error = error;
notifyAll();
}
@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}
public synchronized Result take() throws IOException, InterruptedException {
for (;;) {
if (!queue.isEmpty()) {
return queue.poll();
}
if (finished) {
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
} else {
return null;
}
}
wait();
}
}
}
@Parameter
public Supplier<Scan> scanCreater;
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestRawAsyncTableScan::createNormalScan },
new Supplier<?>[] { TestRawAsyncTableScan::createBatchScan });
}
private static Scan createNormalScan() {
return new Scan();
}
private static Scan createBatchScan() {
return new Scan().setBatch(1);
}
@Override
protected Scan createScan() {
return scanCreater.get();
}
@Override
protected List<Result> doScan(Scan scan) throws Exception {
SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer();
ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer);
List<Result> results = new ArrayList<>();
for (Result result; (result = scanConsumer.take()) != null;) {
results.add(result);
}
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);
}
return results;
}
}