HBASE-18485 Performance issue: ClientAsyncPrefetchScanner is slower than ClientSimpleScanner

This commit is contained in:
Guanghao Zhang 2017-08-04 23:10:18 +08:00
parent 69fddb58bd
commit be034c26b4
3 changed files with 64 additions and 124 deletions

View File

@ -18,15 +18,20 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -48,25 +53,18 @@ import org.apache.hadoop.hbase.util.Threads;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024; private long maxCacheSize;
private static final int DEFAULT_QUEUE_CAPACITY = 1024;
private int cacheCapacity;
private AtomicLong cacheSizeInBytes; private AtomicLong cacheSizeInBytes;
// exception queue (from prefetch to main scan execution) // exception queue (from prefetch to main scan execution)
private Queue<Exception> exceptionsQueue; private Queue<Exception> exceptionsQueue;
// prefetch runnable object to be executed asynchronously // prefetch thread to be executed asynchronously
private PrefetchRunnable prefetchRunnable; private Thread prefetcher;
// Boolean flag to ensure only a single prefetch is running (per scan)
// We use atomic boolean to allow multiple concurrent threads to
// consume records from the same cache, but still have a single prefetcher thread.
// For a single consumer thread this can be replace with a native boolean.
private AtomicBoolean prefetchRunning;
// an attribute for synchronizing close between scanner and prefetch threads
private AtomicLong closingThreadId;
// used for testing // used for testing
private Consumer<Boolean> prefetchListener; private Consumer<Boolean> prefetchListener;
private static final int NO_THREAD = -1;
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
@ -84,82 +82,56 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@Override @Override
protected void initCache() { protected void initCache() {
// concurrent cache // concurrent cache
cacheCapacity = calcCacheCapacity(); maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
cache = new LinkedBlockingQueue<>(); cache = new LinkedBlockingQueue<>();
cacheSizeInBytes = new AtomicLong(0); cacheSizeInBytes = new AtomicLong(0);
exceptionsQueue = new ConcurrentLinkedQueue<>(); exceptionsQueue = new ConcurrentLinkedQueue<>();
prefetchRunnable = new PrefetchRunnable(); prefetcher = new Thread(new PrefetchRunnable());
prefetchRunning = new AtomicBoolean(false); Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher");
closingThreadId = new AtomicLong(NO_THREAD); }
private long resultSize2CacheSize(long maxResultSize) {
// * 2 if possible
return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
} }
@Override @Override
public Result next() throws IOException { public Result next() throws IOException {
try { try {
boolean hasExecutedPrefetch = false; lock.lock();
do { while (cache.isEmpty()) {
handleException(); handleException();
if (this.closed) {
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
if (getCacheCount() == 0 && this.closed) {
return null; return null;
} }
try {
if (prefetchCondition()) { notEmpty.await();
// run prefetch in the background only if no prefetch is already running } catch (InterruptedException e) {
if (!isPrefetchRunning()) { throw new InterruptedIOException("Interrupted when wait to load cache");
if (prefetchRunning.compareAndSet(false, true)) {
getPool().execute(prefetchRunnable);
hasExecutedPrefetch = true;
}
}
} }
}
while (isPrefetchRunning()) { Result result = pollCache();
// prefetch running or still pending if (prefetchCondition()) {
if (getCacheCount() > 0) { notFull.signalAll();
return pollCache(); }
} else { return result;
// (busy) wait for a record - sleep
Threads.sleep(1);
}
}
if (getCacheCount() > 0) {
return pollCache();
}
} while (!hasExecutedPrefetch);
// if we exhausted this scanner before calling close, write out the scan metrics
writeScanMetrics();
return null;
} finally { } finally {
lock.unlock();
handleException(); handleException();
} }
} }
@Override @Override
public void close() { public void close() {
if (!scanMetricsPublished) writeScanMetrics(); try {
closed = true; lock.lock();
if (!isPrefetchRunning()) { super.close();
if(closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { closed = true;
super.close(); notFull.signalAll();
} notEmpty.signalAll();
} // else do nothing since the async prefetch still needs this resources } finally {
} lock.unlock();
@Override
public int getCacheCount() {
if(cache != null) {
int size = cache.size();
if(size > cacheCapacity) {
cacheCapacity = size;
}
return size;
} else {
return 0;
} }
} }
@ -182,44 +154,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
} }
} }
private boolean isPrefetchRunning() {
return prefetchRunning.get();
}
// double buffer - double cache size
private int calcCacheCapacity() {
int capacity = Integer.MAX_VALUE;
if(caching > 0 && caching < (Integer.MAX_VALUE /2)) {
capacity = caching * 2 + 1;
}
if(capacity == Integer.MAX_VALUE){
if(maxScannerResultSize != Integer.MAX_VALUE) {
capacity = (int) (maxScannerResultSize / ESTIMATED_SINGLE_RESULT_SIZE);
}
else {
capacity = DEFAULT_QUEUE_CAPACITY;
}
}
return Math.max(capacity, 1);
}
private boolean prefetchCondition() { private boolean prefetchCondition() {
return return cacheSizeInBytes.get() < maxCacheSize / 2;
(getCacheCount() < getCountThreshold()) &&
(maxScannerResultSize == Long.MAX_VALUE ||
getCacheSizeInBytes() < getSizeThreshold()) ;
}
private int getCountThreshold() {
return Math.max(cacheCapacity / 2, 1);
}
private long getSizeThreshold() {
return Math.max(maxScannerResultSize / 2, 1);
}
private long getCacheSizeInBytes() {
return cacheSizeInBytes.get();
} }
private Result pollCache() { private Result pollCache() {
@ -233,21 +169,22 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@Override @Override
public void run() { public void run() {
boolean succeed = false; while (!closed) {
try { boolean succeed = false;
loadCache(); try {
succeed = true; lock.lock();
} catch (Exception e) { while (!prefetchCondition()) {
exceptionsQueue.add(e); notFull.await();
} finally { }
if (prefetchListener != null) { loadCache();
prefetchListener.accept(succeed); succeed = true;
} } catch (Exception e) {
prefetchRunning.set(false); exceptionsQueue.add(e);
if(closed) { } finally {
if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) { notEmpty.signalAll();
// close was waiting for the prefetch to end lock.unlock();
close(); if (prefetchListener != null) {
prefetchListener.accept(succeed);
} }
} }
} }

View File

@ -72,7 +72,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected Result lastResult = null; protected Result lastResult = null;
protected final long maxScannerResultSize; protected final long maxScannerResultSize;
private final ClusterConnection connection; private final ClusterConnection connection;
private final TableName tableName; protected final TableName tableName;
protected final int scannerTimeout; protected final int scannerTimeout;
protected boolean scanMetricsPublished = false; protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller; protected RpcRetryingCaller<Result[]> caller;
@ -412,6 +412,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// This is possible if we just stopped at the boundary of a region in the previous call. // This is possible if we just stopped at the boundary of a region in the previous call.
if (callable == null) { if (callable == null) {
if (!moveToNextRegion()) { if (!moveToNextRegion()) {
closed = true;
return; return;
} }
} }
@ -478,7 +479,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
assert newLimit >= 0; assert newLimit >= 0;
scan.setLimit(newLimit); scan.setLimit(newLimit);
} }
if (scanExhausted(values)) { if (scan.getLimit() == 0 || scanExhausted(values)) {
closeScanner(); closeScanner();
closed = true; closed = true;
break; break;
@ -532,6 +533,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// we are done with the current region // we are done with the current region
if (regionExhausted) { if (regionExhausted) {
if (!moveToNextRegion()) { if (!moveToNextRegion()) {
closed = true;
break; break;
} }
} }

View File

@ -754,6 +754,7 @@ public class TestScannersFromClientSide {
result = Result.create(kvListScan); result = Result.create(kvListScan);
verifyResult(result, kvListExp, toLog, "Testing async scan"); verifyResult(result, kvListExp, toLog, "Testing async scan");
} }
TEST_UTIL.deleteTable(table); TEST_UTIL.deleteTable(table);
} }