HBASE-13374 Small scanners (with particular configurations) do not return all rows

Signed-off-by: Enis Soztutar <enis@apache.org>
This commit is contained in:
Jonathan Lawlor 2015-04-01 13:05:30 -07:00 committed by Enis Soztutar
parent fef8ae9c70
commit 057499474c
3 changed files with 91 additions and 24 deletions

View File

@ -49,7 +49,6 @@ import com.google.common.annotations.VisibleForTesting;
public class ClientSmallReversedScanner extends ReversedClientScanner { public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
private ScannerCallableWithReplicas smallScanCallable = null; private ScannerCallableWithReplicas smallScanCallable = null;
private byte[] skipRowOfFirstResult = null;
private SmallScannerCallableFactory callableFactory; private SmallScannerCallableFactory callableFactory;
/** /**
@ -135,7 +134,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
// Where to start the next getter // Where to start the next getter
byte[] localStartKey; byte[] localStartKey;
int cacheNum = nbRows; int cacheNum = nbRows;
skipRowOfFirstResult = null; boolean regionChanged = true;
// if we're at end of table, close and return false to stop iterating // if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null && currentRegionDone) { if (this.currentRegion != null && currentRegionDone) {
byte[] startKey = this.currentRegion.getStartKey(); byte[] startKey = this.currentRegion.getStartKey();
@ -154,9 +153,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
LOG.debug("Finished with region " + this.currentRegion); LOG.debug("Finished with region " + this.currentRegion);
} }
} else if (this.lastResult != null) { } else if (this.lastResult != null) {
localStartKey = this.lastResult.getRow(); regionChanged = false;
skipRowOfFirstResult = this.lastResult.getRow(); localStartKey = createClosestRowBefore(lastResult.getRow());
cacheNum++;
} else { } else {
localStartKey = this.scan.getStartRow(); localStartKey = this.scan.getStartRow();
} }
@ -170,7 +168,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
if (this.scanMetrics != null && skipRowOfFirstResult == null) { if (this.scanMetrics != null && regionChanged) {
this.scanMetrics.countOfRegions.incrementAndGet(); this.scanMetrics.countOfRegions.incrementAndGet();
} }
return true; return true;
@ -221,11 +219,6 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
if (values != null && values.length > 0) { if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) { for (int i = 0; i < values.length; i++) {
Result rs = values[i]; Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs); cache.add(rs);
// We don't make Iterator here // We don't make Iterator here
for (Cell cell : rs.rawCells()) { for (Cell cell : rs.rawCells()) {

View File

@ -53,9 +53,6 @@ import com.google.protobuf.ServiceException;
public class ClientSmallScanner extends ClientScanner { public class ClientSmallScanner extends ClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass()); private final Log LOG = LogFactory.getLog(this.getClass());
private ScannerCallableWithReplicas smallScanCallable = null; private ScannerCallableWithReplicas smallScanCallable = null;
// When fetching results from server, skip the first result if it has the same
// row with this one
private byte[] skipRowOfFirstResult = null;
private SmallScannerCallableFactory callableFactory; private SmallScannerCallableFactory callableFactory;
/** /**
@ -144,7 +141,7 @@ public class ClientSmallScanner extends ClientScanner {
// Where to start the next getter // Where to start the next getter
byte[] localStartKey; byte[] localStartKey;
int cacheNum = nbRows; int cacheNum = nbRows;
skipRowOfFirstResult = null; boolean regionChanged = true;
// if we're at end of table, close and return false to stop iterating // if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null && currentRegionDone) { if (this.currentRegion != null && currentRegionDone) {
byte[] endKey = this.currentRegion.getEndKey(); byte[] endKey = this.currentRegion.getEndKey();
@ -161,9 +158,8 @@ public class ClientSmallScanner extends ClientScanner {
LOG.trace("Finished with region " + this.currentRegion); LOG.trace("Finished with region " + this.currentRegion);
} }
} else if (this.lastResult != null) { } else if (this.lastResult != null) {
localStartKey = this.lastResult.getRow(); regionChanged = false;
skipRowOfFirstResult = this.lastResult.getRow(); localStartKey = Bytes.add(lastResult.getRow(), new byte[1]);
cacheNum++;
} else { } else {
localStartKey = this.scan.getStartRow(); localStartKey = this.scan.getStartRow();
} }
@ -175,7 +171,7 @@ public class ClientSmallScanner extends ClientScanner {
smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
if (this.scanMetrics != null && skipRowOfFirstResult == null) { if (this.scanMetrics != null && regionChanged) {
this.scanMetrics.countOfRegions.incrementAndGet(); this.scanMetrics.countOfRegions.incrementAndGet();
} }
return true; return true;
@ -269,11 +265,6 @@ public class ClientSmallScanner extends ClientScanner {
if (values != null && values.length > 0) { if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) { for (int i = 0; i < values.length; i++) {
Result rs = values[i]; Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs); cache.add(rs);
// We don't make Iterator here // We don't make Iterator here
for (Cell cell : rs.rawCells()) { for (Cell cell : rs.rawCells()) {

View File

@ -24,8 +24,10 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.HTestConst;
@ -68,6 +70,8 @@ public class TestScannersFromClientSide {
*/ */
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
} }
@ -170,6 +174,85 @@ public class TestScannersFromClientSide {
} }
@Test
public void testSmallScan() throws Exception {
TableName TABLE = TableName.valueOf("testSmallScan");
int numRows = 10;
byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
int numQualifiers = 10;
byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers);
Table ht = TEST_UTIL.createTable(TABLE, FAMILY);
Put put;
List<Put> puts = new ArrayList<Put>();
for (int row = 0; row < ROWS.length; row++) {
put = new Put(ROWS[row]);
for (int qual = 0; qual < QUALIFIERS.length; qual++) {
KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE);
put.add(kv);
}
puts.add(put);
}
ht.put(puts);
int expectedRows = numRows;
int expectedCols = numRows * numQualifiers;
// Test normal and reversed
testSmallScan(ht, true, expectedRows, expectedCols);
testSmallScan(ht, false, expectedRows, expectedCols);
}
/**
* Run through a variety of test configurations with a small scan
* @param table
* @param reversed
* @param rows
* @param columns
* @throws Exception
*/
public void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception {
Scan baseScan = new Scan();
baseScan.setReversed(reversed);
baseScan.setSmall(true);
Scan scan = new Scan(baseScan);
verifyExpectedCounts(table, scan, rows, columns);
scan = new Scan(baseScan);
scan.setMaxResultSize(1);
verifyExpectedCounts(table, scan, rows, columns);
scan = new Scan(baseScan);
scan.setMaxResultSize(1);
scan.setCaching(Integer.MAX_VALUE);
verifyExpectedCounts(table, scan, rows, columns);
}
private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount,
int expectedCellCount) throws Exception {
ResultScanner scanner = table.getScanner(scan);
int rowCount = 0;
int cellCount = 0;
Result r = null;
while ((r = scanner.next()) != null) {
rowCount++;
for (Cell c : r.rawCells()) {
cellCount++;
}
}
assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
expectedRowCount == rowCount);
assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount,
expectedCellCount == cellCount);
scanner.close();
}
/** /**
* Test from client side for get with maxResultPerCF set * Test from client side for get with maxResultPerCF set
* *