HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region is moved between two rpc requests (Phil Yang)

This commit is contained in:
tedyu 2016-03-17 11:03:29 -07:00
parent 2b8a7f8d7b
commit fbf58f330b
3 changed files with 297 additions and 8 deletions

View File

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -77,6 +78,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
* via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
*/
protected byte[] partialResultsRow = null;
/**
* The last cell from a not full Row which is added to cache
*/
protected Cell lastCellLoadedToCache = null;
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
@ -389,7 +394,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
// We don't expect that the server will have more results for us if
// it doesn't tell us otherwise. We rely on the size or count of results
boolean serverHasMoreResults = false;
boolean allResultsSkipped = false;
do {
allResultsSkipped = false;
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
@ -448,10 +455,15 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Reset the startRow to the row we've seen last so that the new scanner starts at
// the correct row. Otherwise we may see previously returned rows again.
// (ScannerCallable by now has "relocated" the correct region)
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
}
} else {
scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
// we need rescan this row because we only loaded partial row before
scan.setStartRow(lastResult.getRow());
}
}
if (e instanceof OutOfOrderScannerNextException) {
@ -483,12 +495,27 @@ public abstract class ClientScanner extends AbstractClientScanner {
getResultsToAddToCache(values, callable.isHeartbeatMessage());
if (!resultsToAddToCache.isEmpty()) {
for (Result rs : resultsToAddToCache) {
rs = filterLoadedCell(rs);
if (rs == null) {
continue;
}
cache.add(rs);
long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
countdown--;
remainingResultSize -= estimatedHeapSizeOfResult;
addEstimatedSize(estimatedHeapSizeOfResult);
this.lastResult = rs;
if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
updateLastCellLoadedToCache(this.lastResult);
} else {
this.lastCellLoadedToCache = null;
}
}
if (cache.isEmpty()) {
// all result has been seen before, we need scan more.
allResultsSkipped = true;
continue;
}
}
if (callable.isHeartbeatMessage()) {
@ -519,7 +546,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that
// row.
} while ((callable != null && callable.isHeartbeatMessage())
} while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
|| (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
}
@ -783,4 +810,55 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
return false;
}
protected void updateLastCellLoadedToCache(Result result) {
if (result.rawCells().length == 0) {
return;
}
this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
}
/**
* Compare two Cells considering reversed scanner.
* ReversedScanner only reverses rows, not columns.
*/
private int compare(Cell a, Cell b) {
CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ?
CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
int r = comparator.compareRows(a, b);
if (r != 0) {
return this.scan.isReversed() ? -r : r;
}
return CellComparator.compareWithoutRow(a, b);
}
private Result filterLoadedCell(Result result) {
// we only filter result when last result is partial
// so lastCellLoadedToCache and result should have same row key.
// However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
// 3) read more cell. lastCellLoadedToCache and result will be not at same row.
if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
return result;
}
if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
// The first cell of this result is larger than the last cell of loadcache.
// If user do not allow partial result, it must be true.
return result;
}
if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
// The last cell of this result is smaller than the last cell of loadcache, skip all.
return null;
}
// The first one must not in filtered result, we start at the second.
int index = 1;
while (index < result.rawCells().length) {
if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
break;
}
index++;
}
Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
}
}

View File

@ -378,7 +378,7 @@ public class CellComparator implements Comparator<Cell>, Serializable {
roffset, rlength);
}
private static int compareWithoutRow(final Cell left, final Cell right) {
public static int compareWithoutRow(final Cell left, final Cell right) {
// If the column is not specified, the "minimum" key type appears the
// latest in the sorted order, regardless of the timestamp. This is used
// for specifying the last key/value in a given row, because there is no

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -65,7 +67,7 @@ public class TestPartialResultsFromClientSide {
private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static int MINICLUSTER_SIZE = 5;
private static Table TABLE = null;
/**
@ -99,7 +101,8 @@ public class TestPartialResultsFromClientSide {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.startMiniCluster(MINICLUSTER_SIZE);
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
}
@ -430,7 +433,7 @@ public class TestPartialResultsFromClientSide {
}
/**
* Test the method {@link Result#createCompleteResult(List, Result)}
* Test the method {@link Result#createCompleteResult(List)}
* @throws Exception
*/
@Test
@ -829,4 +832,212 @@ public class TestPartialResultsFromClientSide {
testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan);
}
}
private void moveRegion(Table table, int index) throws IOException{
List<Pair<HRegionInfo, ServerName>> regions = MetaTableAccessor
.getTableRegionsAndLocations(TEST_UTIL.getConnection(),
table.getName());
assertEquals(1, regions.size());
HRegionInfo regionInfo = regions.get(0).getFirst();
ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName();
TEST_UTIL.getAdmin().move(regionInfo.getEncodedNameAsBytes(),
Bytes.toBytes(name.getServerName()));
}
private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) {
assertArrayEquals(row,
Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
assertArrayEquals(cf,
Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
assertArrayEquals(cq,
Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
}
@Test
public void testPartialResultWhenRegionMove() throws IOException {
Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"),
ROWS, FAMILIES, QUALIFIERS, VALUE);
moveRegion(table, 1);
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setAllowPartialResults(true);
ResultScanner scanner = table.getScanner(scan);
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) {
scanner.next();
}
Result result1 = scanner.next();
assertEquals(1, result1.rawCells().length);
Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
assertFalse(result1.isPartial());
moveRegion(table, 2);
Result result2 = scanner.next();
assertEquals(1, result2.rawCells().length);
Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
assertTrue(result2.isPartial());
moveRegion(table, 3);
Result result3 = scanner.next();
assertEquals(1, result3.rawCells().length);
Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
assertTrue(result3.isPartial());
}
@Test
public void testReversedPartialResultWhenRegionMove() throws IOException {
Table table=createTestTable(TableName.valueOf("testReversedPartialResultWhenRegionMove"),
ROWS, FAMILIES, QUALIFIERS, VALUE);
moveRegion(table, 1);
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setAllowPartialResults(true);
scan.setReversed(true);
ResultScanner scanner = table.getScanner(scan);
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS-1; i++) {
scanner.next();
}
Result result1 = scanner.next();
assertEquals(1, result1.rawCells().length);
Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
assertFalse(result1.isPartial());
moveRegion(table, 2);
Result result2 = scanner.next();
assertEquals(1, result2.rawCells().length);
Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
assertTrue(result2.isPartial());
moveRegion(table, 3);
Result result3 = scanner.next();
assertEquals(1, result3.rawCells().length);
Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]);
assertTrue(result3.isPartial());
}
@Test
public void testCompleteResultWhenRegionMove() throws IOException {
Table table=createTestTable(TableName.valueOf("testCompleteResultWhenRegionMove"),
ROWS, FAMILIES, QUALIFIERS, VALUE);
moveRegion(table, 1);
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setCaching(1);
ResultScanner scanner = table.getScanner(scan);
Result result1 = scanner.next();
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length);
Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result1.isPartial());
moveRegion(table, 2);
Result result2 = scanner.next();
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length);
Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result2.isPartial());
moveRegion(table, 3);
Result result3 = scanner.next();
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length);
Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result3.isPartial());
}
@Test
public void testReversedCompleteResultWhenRegionMove() throws IOException {
Table table=createTestTable(TableName.valueOf("testReversedCompleteResultWhenRegionMove"),
ROWS, FAMILIES, QUALIFIERS, VALUE);
moveRegion(table, 1);
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setCaching(1);
scan.setReversed(true);
ResultScanner scanner = table.getScanner(scan);
Result result1 = scanner.next();
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length);
Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result1.isPartial());
moveRegion(table, 2);
Result result2 = scanner.next();
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length);
Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result2.isPartial());
moveRegion(table, 3);
Result result3 = scanner.next();
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length);
Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result3.isPartial());
}
@Test
public void testBatchingResultWhenRegionMove() throws IOException {
Table table =
createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES,
QUALIFIERS, VALUE);
moveRegion(table, 1);
Scan scan = new Scan();
scan.setCaching(1);
scan.setBatch(1);
ResultScanner scanner = table.getScanner(scan);
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) {
scanner.next();
}
Result result1 = scanner.next();
assertEquals(1, result1.rawCells().length);
Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
moveRegion(table, 2);
Result result2 = scanner.next();
assertEquals(1, result2.rawCells().length);
Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
moveRegion(table, 3);
Result result3 = scanner.next();
assertEquals(1, result3.rawCells().length);
Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
}
}