HBASE-26765 Minor refactor of async scanning code (#4121)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Nick Dimiduk 2022-02-24 17:20:57 +01:00 committed by Nick Dimiduk
parent 4cdb380ccc
commit 82282a9f95
4 changed files with 22 additions and 19 deletions

View File

@ -340,7 +340,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
public AsyncTable<ScanResultConsumer> build() { public AsyncTable<ScanResultConsumer> build() {
RawAsyncTableImpl rawTable = RawAsyncTableImpl rawTable =
new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); return new AsyncTableImpl(rawTable, pool);
} }
}; };
} }

View File

@ -43,12 +43,11 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
private final AsyncTable<AdvancedScanResultConsumer> rawTable; private final RawAsyncTableImpl rawTable;
private final ExecutorService pool; private final ExecutorService pool;
AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable, AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
ExecutorService pool) {
this.rawTable = rawTable; this.rawTable = rawTable;
this.pool = pool; this.pool = pool;
} }

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -30,8 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically * The {@link ResultScanner} implementation for {@link RawAsyncTableImpl}. It will fetch data
* in background and cache it in memory. Typically the {@link #maxCacheSize} will be * automatically in background and cache it in memory. Typically, the {@link #maxCacheSize} will be
* {@code 2 * scan.getMaxResultSize()}. * {@code 2 * scan.getMaxResultSize()}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -39,7 +40,7 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class); private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class);
private final AsyncTable<AdvancedScanResultConsumer> rawTable; private final TableName tableName;
private final long maxCacheSize; private final long maxCacheSize;
@ -57,12 +58,10 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
private ScanResumer resumer; private ScanResumer resumer;
public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan, public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
long maxCacheSize) { this.tableName = tableName;
this.rawTable = table;
this.maxCacheSize = maxCacheSize; this.maxCacheSize = maxCacheSize;
this.scan = scan; this.scan = scan;
table.scan(scan, this);
} }
private void addToCache(Result result) { private void addToCache(Result result) {
@ -72,9 +71,10 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
private void stopPrefetch(ScanController controller) { private void stopPrefetch(ScanController controller) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(String.format("0x%x", System.identityHashCode(this)) + LOG.debug("{} stop prefetching when scanning {} as the cache size {}" +
" stop prefetching when scanning " + rawTable.getName() + " as the cache size " + " is greater than the maxCacheSize {}",
cacheSize + " is greater than the maxCacheSize " + maxCacheSize); String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize,
maxCacheSize);
} }
resumer = controller.suspend(); resumer = controller.suspend();
} }
@ -138,7 +138,7 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum
return null; return null;
} }
if (error != null) { if (error != null) {
FutureUtils.rethrow(error); throw FutureUtils.rethrow(error);
} }
try { try {
wait(); wait();

View File

@ -666,10 +666,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
} }
@Override @Override
public ResultScanner getScanner(Scan scan) { public AsyncTableResultScanner getScanner(Scan scan) {
return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), final long maxCacheSize = resultSize2CacheSize(
resultSize2CacheSize( scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
final AsyncTableResultScanner scanner =
new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
scan(scan, scanner);
return scanner;
} }
@Override @Override