diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java new file mode 100644 index 00000000000..c4eb4783c7b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Threads; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * ClientAsyncPrefetchScanner implements async scanner behaviour. + * Specifically, the cache used by this scanner is a concurrent queue which allows both + * the producer (hbase client) and consumer (application) to access the queue in parallel. + * The number of rows returned in a prefetch is defined by the caching factor and the result size + * factor. + * This class allocates a buffer cache, whose size is a function of both factors. + * The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty. + * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}. + */ +@InterfaceAudience.Private +public class ClientAsyncPrefetchScanner extends ClientScanner { + + private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024; + private static final int DEFAULT_QUEUE_CAPACITY = 1024; + + private int cacheCapacity; + private AtomicLong cacheSizeInBytes; + // exception queue (from prefetch to main scan execution) + private Queue exceptionsQueue; + // prefetch runnable object to be executed asynchronously + private PrefetchRunnable prefetchRunnable; + // Boolean flag to ensure only a single prefetch is running (per scan) + // We use atomic boolean to allow multiple concurrent threads to + // consume records from the same cache, but still have a single prefetcher thread. + // For a single consumer thread this can be replace with a native boolean. + private AtomicBoolean prefetchRunning; + // an attribute for synchronizing close between scanner and prefetch threads + private AtomicLong closingThreadId; + private static final int NO_THREAD = -1; + + public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, + ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, + int replicaCallTimeoutMicroSecondScan) throws IOException { + super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, + replicaCallTimeoutMicroSecondScan); + } + + @Override + protected void initCache() { + // concurrent cache + cacheCapacity = calcCacheCapacity(); + cache = new LinkedBlockingQueue(cacheCapacity); + cacheSizeInBytes = new AtomicLong(0); + exceptionsQueue = new ConcurrentLinkedQueue(); + prefetchRunnable = new PrefetchRunnable(); + prefetchRunning = new AtomicBoolean(false); + closingThreadId = new AtomicLong(NO_THREAD); + } + + @Override + public Result next() throws IOException { + + try { + handleException(); + + // If the scanner is closed and there's nothing left in the cache, next is a no-op. + if (getCacheCount() == 0 && this.closed) { + return null; + } + if (prefetchCondition()) { + // run prefetch in the background only if no prefetch is already running + if (!isPrefetchRunning()) { + if (prefetchRunning.compareAndSet(false, true)) { + getPool().execute(prefetchRunnable); + } + } + } + + while (isPrefetchRunning()) { + // prefetch running or still pending + if (getCacheCount() > 0) { + Result res = cache.poll(); + long estimatedSize = calcEstimatedSize(res); + addEstimatedSize(-estimatedSize); + return res; + } else { + // (busy) wait for a record - sleep + Threads.sleep(1); + } + } + + if (getCacheCount() > 0) { + return cache.poll(); + } + + // if we exhausted this scanner before calling close, write out the scan metrics + writeScanMetrics(); + return null; + } finally { + handleException(); + } + } + + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + closed = true; + if (!isPrefetchRunning()) { + if(closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { + super.close(); + } + } // else do nothing since the async prefetch still needs this resources + } + + @Override + public int getCacheCount() { + if(cache != null) { + int size = cache.size(); + if(size > cacheCapacity) { + cacheCapacity = size; + } + return size; + } else { + return 0; + } + } + + @Override + protected void addEstimatedSize(long estimatedSize) { + cacheSizeInBytes.addAndGet(estimatedSize); + } + + private void handleException() throws IOException { + //The prefetch task running in the background puts any exception it + //catches into this exception queue. + // Rethrow the exception so the application can handle it. + while (!exceptionsQueue.isEmpty()) { + Exception first = exceptionsQueue.peek(); + first.printStackTrace(); + if (first instanceof IOException) { + throw (IOException) first; + } + throw (RuntimeException) first; + } + } + + private boolean isPrefetchRunning() { + return prefetchRunning.get(); + } + + // double buffer - double cache size + private int calcCacheCapacity() { + int capacity = Integer.MAX_VALUE; + if(caching > 0 && caching < (Integer.MAX_VALUE /2)) { + capacity = caching * 2 + 1; + } + if(capacity == Integer.MAX_VALUE){ + if(maxScannerResultSize != Integer.MAX_VALUE) { + capacity = (int) (maxScannerResultSize / ESTIMATED_SINGLE_RESULT_SIZE); + } + else { + capacity = DEFAULT_QUEUE_CAPACITY; + } + } + return capacity; + } + + private boolean prefetchCondition() { + return + (getCacheCount() < getCountThreshold()) && + (maxScannerResultSize == Long.MAX_VALUE || + getCacheSizeInBytes() < getSizeThreshold()) ; + } + + private int getCountThreshold() { + return cacheCapacity / 2 ; + } + + private long getSizeThreshold() { + return maxScannerResultSize / 2 ; + } + + private long getCacheSizeInBytes() { + return cacheSizeInBytes.get(); + } + + + private class PrefetchRunnable implements Runnable { + + @Override + public void run() { + try { + loadCache(); + } catch (Exception e) { + exceptionsQueue.add(e); + } finally { + prefetchRunning.set(false); + if(closed) { + if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { + // close was waiting for the prefetch to end + close(); + } + } + } + } + + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1ea87690747..566bf4b0aa5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,14 +17,7 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -45,7 +38,14 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; /** * Implements the scanner interface for the HBase client. @@ -53,7 +53,7 @@ import com.google.common.annotations.VisibleForTesting; * through them all. */ @InterfaceAudience.Private -public class ClientScanner extends AbstractClientScanner { +public abstract class ClientScanner extends AbstractClientScanner { private static final Log LOG = LogFactory.getLog(ClientScanner.class); // A byte array in which all elements are the max byte, and it is used to // construct closest front row @@ -64,7 +64,7 @@ public class ClientScanner extends AbstractClientScanner { // wonky: e.g. if it splits on us. protected HRegionInfo currentRegion = null; protected ScannerCallableWithReplicas callable = null; - protected final LinkedList cache = new LinkedList(); + protected Queue cache; /** * A list of partial results that have been returned from the server. This list should only * contain results if this scanner does not have enough partial results to form the complete @@ -151,9 +151,12 @@ public class ClientScanner extends AbstractClientScanner { this.rpcControllerFactory = controllerFactory; this.conf = conf; + initCache(); initializeScannerInConstruction(); } + protected abstract void initCache(); + protected void initializeScannerInConstruction() throws IOException{ // initialize the scanner nextScanner(this.caching, false); @@ -342,8 +345,11 @@ public class ClientScanner extends AbstractClientScanner { scanMetricsPublished = true; } - @Override - public Result next() throws IOException { + protected void initSyncCache() { + cache = new LinkedList(); + } + + protected Result nextWithSyncCache() throws IOException { // If the scanner is closed and there's nothing left in the cache, next is a no-op. if (cache.size() == 0 && this.closed) { return null; @@ -370,6 +376,8 @@ public class ClientScanner extends AbstractClientScanner { * Contact the servers to load more {@link Result}s in the cache. */ protected void loadCache() throws IOException { + // check if scanner was closed during previous prefetch + if (closed) return; Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; @@ -482,11 +490,10 @@ public class ClientScanner extends AbstractClientScanner { if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { cache.add(rs); - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } + long estimatedHeapSizeOfResult = calcEstimatedSize(rs); countdown--; + remainingResultSize -= estimatedHeapSizeOfResult; + addEstimatedSize(estimatedHeapSizeOfResult); this.lastResult = rs; } } @@ -532,6 +539,24 @@ public class ClientScanner extends AbstractClientScanner { return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults; } + protected long calcEstimatedSize(Result rs) { + long estimatedHeapSizeOfResult = 0; + // We don't make Iterator here + for (Cell cell : rs.rawCells()) { + estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell); + } + return estimatedHeapSizeOfResult; + } + + protected void addEstimatedSize(long estimatedHeapSizeOfResult) { + return; + } + + @VisibleForTesting + public int getCacheCount() { + return cache != null ? cache.size() : 0; + } + /** * This method ensures all of our book keeping regarding partial results is kept up to date. This * method should be called once we know that the results we received back from the RPC request do diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java new file mode 100644 index 00000000000..3998ac0df9d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ExecutorService; + +/** + * ClientSimpleScanner implements a sync scanner behaviour. + * The cache is a simple list. + * The prefetch is invoked only when the application finished processing the entire cache. + */ +@InterfaceAudience.Private +public class ClientSimpleScanner extends ClientScanner { + + public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, + ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, + int replicaCallTimeoutMicroSecondScan) throws IOException { + super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, + replicaCallTimeoutMicroSecondScan); + } + + @Override + protected void initCache() { + initSyncCache(); + } + + @Override + public Result next() throws IOException { + return nextWithSyncCache(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index e959e272b4b..f9bdd55d12c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -18,10 +18,8 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutorService; - +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,8 +37,9 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ExecutorService; /** * Client scanner for small scan. Generally, only one RPC is called to fetch the @@ -50,7 +49,7 @@ import com.google.protobuf.ServiceException; * For small scan, it will get better performance than {@link ClientScanner} */ @InterfaceAudience.Private -public class ClientSmallScanner extends ClientScanner { +public class ClientSmallScanner extends ClientSimpleScanner { private static final Log LOG = LogFactory.getLog(ClientSmallScanner.class); private ScannerCallableWithReplicas smallScanCallable = null; private SmallScannerCallableFactory callableFactory; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 324fe617252..6ba0b877463 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -620,6 +620,11 @@ public class HTable implements HTableInterface { scan.setMaxResultSize(scannerMaxResultSize); } + Boolean async = scan.isAsyncPrefetch(); + if (async == null) { + async = tableConfiguration.isClientScannerAsyncPrefetch(); + } + if (scan.isReversed()) { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), @@ -637,9 +642,15 @@ public class HTable implements HTableInterface { this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { - return new ClientScanner(getConfiguration(), scan, getName(), this.connection, - this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + if (async) { + return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory, + pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } else { + return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, + this.rpcCallerFactory, this.rpcControllerFactory, + pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index ef4b89d6b46..dde82ba5447 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.ExceptionUtil; * A reversed client scanner which support backward scanning */ @InterfaceAudience.Private -public class ReversedClientScanner extends ClientScanner { +public class ReversedClientScanner extends ClientSimpleScanner { private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 3b6194f99b6..0b511505110 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -145,8 +145,23 @@ public class Scan extends Query { private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean loadColumnFamiliesOnDemand = null; + private Boolean asyncPrefetch = null; /** + * Parameter name for client scanner sync/async prefetch toggle. + * When using async scanner, prefetching data from the server is done at the background. + * The parameter currently won't have any effect in the case that the user has set + * Scan#setSmall or Scan#setReversed + */ + public static final String HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = + "hbase.client.scanner.async.prefetch"; + + /** + * Default value of {@link #HBASE_CLIENT_SCANNER_ASYNC_PREFETCH}. + */ + public static final boolean DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH = false; + + /** * Set it true for small scan to get better performance * * Small scan should use pread and big scan can use seek + read @@ -255,6 +270,7 @@ public class Scan extends Query { this.tr = get.getTimeRange(); this.familyMap = get.getFamilyMap(); this.getScan = true; + this.asyncPrefetch = false; this.consistency = get.getConsistency(); for (Map.Entry attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); @@ -971,4 +987,16 @@ public class Scan extends Query { if (bytes == null) return null; return ProtobufUtil.toScanMetrics(bytes); } -} \ No newline at end of file + + + public Boolean isAsyncPrefetch() { + return asyncPrefetch; + } + + public Scan setAsyncPrefetch(boolean asyncPrefetch) { + this.asyncPrefetch = asyncPrefetch; + return this; + } + + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java index 901e86d3043..1113cfd517a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java @@ -43,7 +43,10 @@ public class TableConfiguration { private final int retries; private final int maxKeyValueSize; - /** + // toggle for async/sync prefetch + private final boolean clientScannerAsyncPrefetch; + + /** * Constructor * @param conf Configuration object */ @@ -73,6 +76,9 @@ public class TableConfiguration { this.retries = conf.getInt( HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.clientScannerAsyncPrefetch = conf.getBoolean( + Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH); + this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); } @@ -91,6 +97,7 @@ public class TableConfiguration { this.primaryCallTimeoutMicroSecond = 10000; this.replicaCallTimeoutMicroSecondScan = 1000000; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; + this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; } @@ -129,4 +136,9 @@ public class TableConfiguration { public long getScannerMaxResultSize() { return scannerMaxResultSize; } + + public boolean isClientScannerAsyncPrefetch() { + return clientScannerAsyncPrefetch; + } + } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index a91def38b91..3f406dfd0b7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -121,6 +122,15 @@ public class TestClientScanner { public void setRpcFinished(boolean rpcFinished) { this.rpcFinished = rpcFinished; } + + @Override + protected void initCache() { + initSyncCache(); + } + + @Override public Result next() throws IOException { + return nextWithSyncCache(); + } } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java index 4611d08dfe1..082090ec2a3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java @@ -17,23 +17,9 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -46,6 +32,16 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test the ClientSmallReversedScanner. */ @@ -178,7 +174,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); - List results = csrs.cache; + Queue results = csrs.cache; Iterator iter = results.iterator(); assertEquals(3, results.size()); for (int i = 3; i >= 1 && iter.hasNext(); i--) { @@ -248,7 +244,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); - List results = csrs.cache; + Queue results = csrs.cache; Iterator iter = results.iterator(); assertEquals(2, results.size()); for (int i = 3; i >= 2 && iter.hasNext(); i--) { @@ -264,7 +260,7 @@ public class TestClientSmallReversedScanner { csrs.loadCache(); assertEquals(1, results.size()); - Result result = results.get(0); + Result result = results.peek(); assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java index 90bf4bbd3ab..318fbe7d4dc 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java @@ -17,22 +17,9 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -45,6 +32,15 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test the ClientSmallScanner. */ @@ -176,10 +172,10 @@ public class TestClientSmallScanner { css.loadCache(); - List results = css.cache; + Queue results = css.cache; assertEquals(3, results.size()); for (int i = 1; i <= 3; i++) { - Result result = results.get(i - 1); + Result result = results.poll(); byte[] row = result.getRow(); assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); @@ -243,10 +239,10 @@ public class TestClientSmallScanner { css.loadCache(); - List results = css.cache; + Queue results = css.cache; assertEquals(2, results.size()); for (int i = 1; i <= 2; i++) { - Result result = results.get(i - 1); + Result result = results.poll(); byte[] row = result.getRow(); assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); @@ -258,7 +254,7 @@ public class TestClientSmallScanner { css.loadCache(); assertEquals(1, results.size()); - Result result = results.get(0); + Result result = results.peek(); assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8)); assertEquals(1, result.getMap().size()); assertTrue(css.closed); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index fc04bf0674b..5afc22631d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -642,6 +642,55 @@ public class TestScannersFromClientSide { verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); } + /** + * Test from client side for async scan + * + * @throws Exception + */ + @Test + public void testAsyncScanner() throws Exception { + byte [] TABLE = Bytes.toBytes("testAsyncScan"); + byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); + byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + + Put put; + Scan scan; + Result result; + boolean toLog = true; + List kvListExp, kvListScan; + + kvListExp = new ArrayList(); + + for (int r=0; r < ROWS.length; r++) { + put = new Put(ROWS[r]); + for (int c=0; c < FAMILIES.length; c++) { + for (int q=0; q < QUALIFIERS.length; q++) { + KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + put.add(kv); + kvListExp.add(kv); + } + } + ht.put(put); + } + + scan = new Scan(); + scan.setAsyncPrefetch(true); + ResultScanner scanner = ht.getScanner(scan); + kvListScan = new ArrayList(); + while ((result = scanner.next()) != null) { + for (Cell kv : result.listCells()) { + kvListScan.add(kv); + } + } + result = Result.create(kvListScan); + assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); + verifyResult(result, kvListExp, toLog, "Testing async scan"); + + } + static void verifyResult(Result result, List expKvList, boolean toLog, String msg) {