From be034c26b4ba820651d3255798ef01dc958608d5 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 4 Aug 2017 23:10:18 +0800 Subject: [PATCH] HBASE-18485 Performance issue: ClientAsyncPrefetchScanner is slower than ClientSimpleScanner --- .../client/ClientAsyncPrefetchScanner.java | 181 ++++++------------ .../hadoop/hbase/client/ClientScanner.java | 6 +- .../client/TestScannersFromClientSide.java | 1 + 3 files changed, 64 insertions(+), 124 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index 34c5620e5ad..e8da18f1f44 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -18,15 +18,20 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; @@ -48,25 +53,18 @@ import org.apache.hadoop.hbase.util.Threads; @InterfaceAudience.Private public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { - private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024; - private static final int DEFAULT_QUEUE_CAPACITY = 1024; - - private int cacheCapacity; + private long maxCacheSize; private AtomicLong cacheSizeInBytes; // exception queue (from prefetch to main scan execution) private Queue exceptionsQueue; - // prefetch runnable object to be executed asynchronously - private PrefetchRunnable prefetchRunnable; - // Boolean flag to ensure only a single prefetch is running (per scan) - // We use atomic boolean to allow multiple concurrent threads to - // consume records from the same cache, but still have a single prefetcher thread. - // For a single consumer thread this can be replace with a native boolean. - private AtomicBoolean prefetchRunning; - // an attribute for synchronizing close between scanner and prefetch threads - private AtomicLong closingThreadId; + // prefetch thread to be executed asynchronously + private Thread prefetcher; // used for testing private Consumer prefetchListener; - private static final int NO_THREAD = -1; + + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + private final Condition notFull = lock.newCondition(); public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, @@ -84,82 +82,56 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override protected void initCache() { // concurrent cache - cacheCapacity = calcCacheCapacity(); + maxCacheSize = resultSize2CacheSize(maxScannerResultSize); cache = new LinkedBlockingQueue<>(); cacheSizeInBytes = new AtomicLong(0); exceptionsQueue = new ConcurrentLinkedQueue<>(); - prefetchRunnable = new PrefetchRunnable(); - prefetchRunning = new AtomicBoolean(false); - closingThreadId = new AtomicLong(NO_THREAD); + prefetcher = new Thread(new PrefetchRunnable()); + Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher"); + } + + private long resultSize2CacheSize(long maxResultSize) { + // * 2 if possible + return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; } @Override public Result next() throws IOException { - try { - boolean hasExecutedPrefetch = false; - do { + lock.lock(); + while (cache.isEmpty()) { handleException(); - - // If the scanner is closed and there's nothing left in the cache, next is a no-op. - if (getCacheCount() == 0 && this.closed) { + if (this.closed) { return null; } - - if (prefetchCondition()) { - // run prefetch in the background only if no prefetch is already running - if (!isPrefetchRunning()) { - if (prefetchRunning.compareAndSet(false, true)) { - getPool().execute(prefetchRunnable); - hasExecutedPrefetch = true; - } - } + try { + notEmpty.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when wait to load cache"); } + } - while (isPrefetchRunning()) { - // prefetch running or still pending - if (getCacheCount() > 0) { - return pollCache(); - } else { - // (busy) wait for a record - sleep - Threads.sleep(1); - } - } - - if (getCacheCount() > 0) { - return pollCache(); - } - } while (!hasExecutedPrefetch); - - // if we exhausted this scanner before calling close, write out the scan metrics - writeScanMetrics(); - return null; + Result result = pollCache(); + if (prefetchCondition()) { + notFull.signalAll(); + } + return result; } finally { + lock.unlock(); handleException(); } } @Override public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - closed = true; - if (!isPrefetchRunning()) { - if(closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { - super.close(); - } - } // else do nothing since the async prefetch still needs this resources - } - - @Override - public int getCacheCount() { - if(cache != null) { - int size = cache.size(); - if(size > cacheCapacity) { - cacheCapacity = size; - } - return size; - } else { - return 0; + try { + lock.lock(); + super.close(); + closed = true; + notFull.signalAll(); + notEmpty.signalAll(); + } finally { + lock.unlock(); } } @@ -182,44 +154,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { } } - private boolean isPrefetchRunning() { - return prefetchRunning.get(); - } - - // double buffer - double cache size - private int calcCacheCapacity() { - int capacity = Integer.MAX_VALUE; - if(caching > 0 && caching < (Integer.MAX_VALUE /2)) { - capacity = caching * 2 + 1; - } - if(capacity == Integer.MAX_VALUE){ - if(maxScannerResultSize != Integer.MAX_VALUE) { - capacity = (int) (maxScannerResultSize / ESTIMATED_SINGLE_RESULT_SIZE); - } - else { - capacity = DEFAULT_QUEUE_CAPACITY; - } - } - return Math.max(capacity, 1); - } - private boolean prefetchCondition() { - return - (getCacheCount() < getCountThreshold()) && - (maxScannerResultSize == Long.MAX_VALUE || - getCacheSizeInBytes() < getSizeThreshold()) ; - } - - private int getCountThreshold() { - return Math.max(cacheCapacity / 2, 1); - } - - private long getSizeThreshold() { - return Math.max(maxScannerResultSize / 2, 1); - } - - private long getCacheSizeInBytes() { - return cacheSizeInBytes.get(); + return cacheSizeInBytes.get() < maxCacheSize / 2; } private Result pollCache() { @@ -233,21 +169,22 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override public void run() { - boolean succeed = false; - try { - loadCache(); - succeed = true; - } catch (Exception e) { - exceptionsQueue.add(e); - } finally { - if (prefetchListener != null) { - prefetchListener.accept(succeed); - } - prefetchRunning.set(false); - if(closed) { - if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { - // close was waiting for the prefetch to end - close(); + while (!closed) { + boolean succeed = false; + try { + lock.lock(); + while (!prefetchCondition()) { + notFull.await(); + } + loadCache(); + succeed = true; + } catch (Exception e) { + exceptionsQueue.add(e); + } finally { + notEmpty.signalAll(); + lock.unlock(); + if (prefetchListener != null) { + prefetchListener.accept(succeed); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index d3b19e4844d..252243422a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -72,7 +72,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected Result lastResult = null; protected final long maxScannerResultSize; private final ClusterConnection connection; - private final TableName tableName; + protected final TableName tableName; protected final int scannerTimeout; protected boolean scanMetricsPublished = false; protected RpcRetryingCaller caller; @@ -412,6 +412,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // This is possible if we just stopped at the boundary of a region in the previous call. if (callable == null) { if (!moveToNextRegion()) { + closed = true; return; } } @@ -478,7 +479,7 @@ public abstract class ClientScanner extends AbstractClientScanner { assert newLimit >= 0; scan.setLimit(newLimit); } - if (scanExhausted(values)) { + if (scan.getLimit() == 0 || scanExhausted(values)) { closeScanner(); closed = true; break; @@ -532,6 +533,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // we are done with the current region if (regionExhausted) { if (!moveToNextRegion()) { + closed = true; break; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index ef00b244091..43be5731d03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -754,6 +754,7 @@ public class TestScannersFromClientSide { result = Result.create(kvListScan); verifyResult(result, kvListExp, toLog, "Testing async scan"); } + TEST_UTIL.deleteTable(table); }