HBASE-17723 ClientAsyncPrefetchScanner may end prematurely when the size of the cache is one

This commit is contained in:
Chia-Ping Tsai 2017-03-12 13:48:12 +08:00
parent a49bc58a54
commit 0ecb678259
2 changed files with 89 additions and 40 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import java.io.IOException;
@ -26,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
@ -62,6 +64,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private AtomicBoolean prefetchRunning;
// an attribute for synchronizing close between scanner and prefetch threads
private AtomicLong closingThreadId;
// used for testing
private Consumer<Boolean> prefetchListener;
private static final int NO_THREAD = -1;
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
@ -72,6 +76,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
replicaCallTimeoutMicroSecondScan);
}
@VisibleForTesting
void setPrefetchListener(Consumer<Boolean> prefetchListener) {
this.prefetchListener = prefetchListener;
}
@Override
protected void initCache() {
// concurrent cache
@ -88,34 +97,39 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
public Result next() throws IOException {
try {
handleException();
boolean hasExecutedPrefetch = false;
do {
handleException();
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
if (getCacheCount() == 0 && this.closed) {
return null;
}
if (prefetchCondition()) {
// run prefetch in the background only if no prefetch is already running
if (!isPrefetchRunning()) {
if (prefetchRunning.compareAndSet(false, true)) {
getPool().execute(prefetchRunnable);
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
if (getCacheCount() == 0 && this.closed) {
return null;
}
if (prefetchCondition()) {
// run prefetch in the background only if no prefetch is already running
if (!isPrefetchRunning()) {
if (prefetchRunning.compareAndSet(false, true)) {
getPool().execute(prefetchRunnable);
hasExecutedPrefetch = true;
}
}
}
while (isPrefetchRunning()) {
// prefetch running or still pending
if (getCacheCount() > 0) {
return pollCache();
} else {
// (busy) wait for a record - sleep
Threads.sleep(1);
}
}
}
while (isPrefetchRunning()) {
// prefetch running or still pending
if (getCacheCount() > 0) {
return pollCache();
} else {
// (busy) wait for a record - sleep
Threads.sleep(1);
}
}
if (getCacheCount() > 0) {
return pollCache();
}
} while (!hasExecutedPrefetch);
// if we exhausted this scanner before calling close, write out the scan metrics
writeScanMetrics();
@ -219,11 +233,16 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@Override
public void run() {
boolean succeed = false;
try {
loadCache();
succeed = true;
} catch (Exception e) {
exceptionsQueue.add(e);
} finally {
if (prefetchListener != null) {
prefetchListener.accept(succeed);
}
prefetchRunning.set(false);
if(closed) {
if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) {

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.commons.logging.Log;
@ -656,7 +657,9 @@ public class TestScannersFromClientSide {
testAsyncScanner(TableName.valueOf(name.getMethodName()),
2,
3,
10);
10,
-1,
null);
}
@Test
@ -664,11 +667,28 @@ public class TestScannersFromClientSide {
testAsyncScanner(TableName.valueOf(name.getMethodName()),
30000,
1,
1);
1,
-1,
null);
}
@Test
public void testAsyncScannerWithoutCaching() throws Exception {
testAsyncScanner(TableName.valueOf(name.getMethodName()),
5,
1,
1,
1,
(b) -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
}
});
}
private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
int qualifierNumber) throws Exception {
int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
assert rowNumber > 0;
assert familyNumber > 0;
assert qualifierNumber > 0;
@ -707,23 +727,33 @@ public class TestScannersFromClientSide {
Scan scan = new Scan();
scan.setAsyncPrefetch(true);
ResultScanner scanner = ht.getScanner(scan);
List<Cell> kvListScan = new ArrayList<>();
Result result;
boolean first = true;
while ((result = scanner.next()) != null) {
// waiting for cache. see HBASE-17376
if (first) {
TimeUnit.SECONDS.sleep(1);
first = false;
}
for (Cell kv : result.listCells()) {
kvListScan.add(kv);
}
if (caching > 0) {
scan.setCaching(caching);
}
try (ResultScanner scanner = ht.getScanner(scan)) {
assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
List<Cell> kvListScan = new ArrayList<>();
Result result;
boolean first = true;
int actualRows = 0;
while ((result = scanner.next()) != null) {
++actualRows;
// waiting for cache. see HBASE-17376
if (first) {
TimeUnit.SECONDS.sleep(1);
first = false;
}
for (Cell kv : result.listCells()) {
kvListScan.add(kv);
}
}
assertEquals(rowNumber, actualRows);
// These cells may have different rows but it is ok. The Result#getRow
// isn't used in the verifyResult()
result = Result.create(kvListScan);
verifyResult(result, kvListExp, toLog, "Testing async scan");
}
result = Result.create(kvListScan);
assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
verifyResult(result, kvListExp, toLog, "Testing async scan");
TEST_UTIL.deleteTable(table);
}