HBASE-17376 ClientAsyncPrefetchScanner may fail due to too many rows (ChiaPing Tsai)

This commit is contained in:
tedyu 2016-12-26 15:55:22 -08:00
parent 463ffa792a
commit e18e9a22da
2 changed files with 69 additions and 26 deletions

View File

@ -76,7 +76,7 @@ public class ClientAsyncPrefetchScanner extends ClientScanner {
protected void initCache() { protected void initCache() {
// concurrent cache // concurrent cache
cacheCapacity = calcCacheCapacity(); cacheCapacity = calcCacheCapacity();
cache = new LinkedBlockingQueue<Result>(cacheCapacity); cache = new LinkedBlockingQueue<Result>();
cacheSizeInBytes = new AtomicLong(0); cacheSizeInBytes = new AtomicLong(0);
exceptionsQueue = new ConcurrentLinkedQueue<Exception>(); exceptionsQueue = new ConcurrentLinkedQueue<Exception>();
prefetchRunnable = new PrefetchRunnable(); prefetchRunnable = new PrefetchRunnable();

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -646,45 +649,72 @@ public class TestScannersFromClientSide {
verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
} }
/**
* Test from client side for async scan
*
* @throws Exception
*/
@Test @Test
public void testAsyncScanner() throws Exception { public void testAsyncScannerWithSmallData() throws Exception {
TableName TABLE = TableName.valueOf("testAsyncScan"); testAsyncScanner(TableName.valueOf("testAsyncScannerWithSmallData"),
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); 2,
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 3,
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 10);
}
Table ht = TEST_UTIL.createTable(TABLE, FAMILIES); @Test
public void testAsyncScannerWithManyRows() throws Exception {
testAsyncScanner(TableName.valueOf("testAsyncScannerWithManyRows"),
30000,
1,
1);
}
private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
int qualifierNumber) throws Exception {
assert rowNumber > 0;
assert familyNumber > 0;
assert qualifierNumber > 0;
byte[] row = Bytes.toBytes("r");
byte[] family = Bytes.toBytes("f");
byte[] qualifier = Bytes.toBytes("q");
byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber);
byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber);
byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber);
Table ht = TEST_UTIL.createTable(table, families);
Put put;
Scan scan;
Result result;
boolean toLog = true; boolean toLog = true;
List<Cell> kvListExp, kvListScan; List<Cell> kvListExp = new ArrayList<>();
kvListExp = new ArrayList<Cell>(); List<Put> puts = new ArrayList<>();
for (byte[] r : rows) {
for (int r=0; r < ROWS.length; r++) { Put put = new Put(r);
put = new Put(ROWS[r]); for (byte[] f : families) {
for (int c=0; c < FAMILIES.length; c++) { for (byte[] q : qualifiers) {
for (int q=0; q < QUALIFIERS.length; q++) { KeyValue kv = new KeyValue(r, f, q, 1, VALUE);
KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
put.add(kv); put.add(kv);
kvListExp.add(kv); kvListExp.add(kv);
} }
} }
ht.put(put); puts.add(put);
if (puts.size() > 1000) {
ht.put(puts);
puts.clear();
}
}
if (!puts.isEmpty()) {
ht.put(puts);
puts.clear();
} }
scan = new Scan(); Scan scan = new Scan();
scan.setAsyncPrefetch(true); scan.setAsyncPrefetch(true);
ResultScanner scanner = ht.getScanner(scan); ResultScanner scanner = ht.getScanner(scan);
kvListScan = new ArrayList<Cell>(); List<Cell> kvListScan = new ArrayList<>();
Result result;
boolean first = true;
while ((result = scanner.next()) != null) { while ((result = scanner.next()) != null) {
// waiting for cache. see HBASE-17376
if (first) {
TimeUnit.SECONDS.sleep(1);
first = false;
}
for (Cell kv : result.listCells()) { for (Cell kv : result.listCells()) {
kvListScan.add(kv); kvListScan.add(kv);
} }
@ -692,7 +722,20 @@ public class TestScannersFromClientSide {
result = Result.create(kvListScan); result = Result.create(kvListScan);
assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
verifyResult(result, kvListExp, toLog, "Testing async scan"); verifyResult(result, kvListExp, toLog, "Testing async scan");
TEST_UTIL.deleteTable(table);
}
private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) {
int maxLength = Integer.toString(n).length();
byte [][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
int length = Integer.toString(i).length();
StringBuilder buf = new StringBuilder(Integer.toString(i));
IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0"));
byte[] tail = Bytes.toBytes(buf.toString());
ret[i] = Bytes.add(base, tail);
}
return ret;
} }
static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,