From fbf58f330b7affa633513fd03076954f0d90c2fc Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 17 Mar 2016 11:03:29 -0700 Subject: [PATCH] HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region is moved between two rpc requests (Phil Yang) --- .../hadoop/hbase/client/ClientScanner.java | 86 ++++++- .../apache/hadoop/hbase/CellComparator.java | 2 +- .../TestPartialResultsFromClientSide.java | 217 +++++++++++++++++- 3 files changed, 297 insertions(+), 8 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 22a56e355e2..3b6b83a3f7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -77,6 +78,10 @@ public abstract class ClientScanner extends AbstractClientScanner { * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} */ protected byte[] partialResultsRow = null; + /** + * The last cell from a not full Row which is added to cache + */ + protected Cell lastCellLoadedToCache = null; protected final int caching; protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. @@ -389,7 +394,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; + boolean allResultsSkipped = false; do { + allResultsSkipped = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -448,10 +455,15 @@ public abstract class ClientScanner extends AbstractClientScanner { // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) { + if (scan.isReversed()) { + scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + } else { + scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + } } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + // we need rescan this row because we only loaded partial row before + scan.setStartRow(lastResult.getRow()); } } if (e instanceof OutOfOrderScannerNextException) { @@ -483,12 +495,27 @@ public abstract class ClientScanner extends AbstractClientScanner { getResultsToAddToCache(values, callable.isHeartbeatMessage()); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { + rs = filterLoadedCell(rs); + if (rs == null) { + continue; + } + cache.add(rs); long estimatedHeapSizeOfResult = calcEstimatedSize(rs); countdown--; remainingResultSize -= estimatedHeapSizeOfResult; addEstimatedSize(estimatedHeapSizeOfResult); this.lastResult = rs; + if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { + updateLastCellLoadedToCache(this.lastResult); + } else { + this.lastCellLoadedToCache = null; + } + } + if (cache.isEmpty()) { + // all result has been seen before, we need scan more. + allResultsSkipped = true; + continue; } } if (callable.isHeartbeatMessage()) { @@ -519,7 +546,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while ((callable != null && callable.isHeartbeatMessage()) + } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage()) || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } @@ -783,4 +810,55 @@ public abstract class ClientScanner extends AbstractClientScanner { } return false; } + + protected void updateLastCellLoadedToCache(Result result) { + if (result.rawCells().length == 0) { + return; + } + this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1]; + } + + /** + * Compare two Cells considering reversed scanner. + * ReversedScanner only reverses rows, not columns. + */ + private int compare(Cell a, Cell b) { + CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ? + CellComparator.META_COMPARATOR : CellComparator.COMPARATOR; + int r = comparator.compareRows(a, b); + if (r != 0) { + return this.scan.isReversed() ? -r : r; + } + return CellComparator.compareWithoutRow(a, b); + } + + private Result filterLoadedCell(Result result) { + // we only filter result when last result is partial + // so lastCellLoadedToCache and result should have same row key. + // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region; + // 3) read more cell. lastCellLoadedToCache and result will be not at same row. + if (lastCellLoadedToCache == null || result.rawCells().length == 0) { + return result; + } + if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) { + // The first cell of this result is larger than the last cell of loadcache. + // If user do not allow partial result, it must be true. + return result; + } + if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) { + // The last cell of this result is smaller than the last cell of loadcache, skip all. + return null; + } + + // The first one must not in filtered result, we start at the second. + int index = 1; + while (index < result.rawCells().length) { + if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) { + break; + } + index++; + } + Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); + return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index d869b3e75e7..a5e26cf1e1f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -378,7 +378,7 @@ public class CellComparator implements Comparator, Serializable { roffset, rlength); } - private static int compareWithoutRow(final Cell left, final Cell right) { + public static int compareWithoutRow(final Cell left, final Cell right) { // If the column is not specified, the "minimum" key type appears the // latest in the sorted order, regardless of the timestamp. This is used // for specifying the last key/value in a given row, because there is no diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index a6f8373eb0d..c6a2525f0fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -65,7 +67,7 @@ public class TestPartialResultsFromClientSide { private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - + private final static int MINICLUSTER_SIZE = 5; private static Table TABLE = null; /** @@ -99,7 +101,8 @@ public class TestPartialResultsFromClientSide { @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniCluster(MINICLUSTER_SIZE); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); } @@ -430,7 +433,7 @@ public class TestPartialResultsFromClientSide { } /** - * Test the method {@link Result#createCompleteResult(List, Result)} + * Test the method {@link Result#createCompleteResult(List)} * @throws Exception */ @Test @@ -829,4 +832,212 @@ public class TestPartialResultsFromClientSide { testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan); } } + + private void moveRegion(Table table, int index) throws IOException{ + List> regions = MetaTableAccessor + .getTableRegionsAndLocations(TEST_UTIL.getConnection(), + table.getName()); + assertEquals(1, regions.size()); + HRegionInfo regionInfo = regions.get(0).getFirst(); + ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName(); + TEST_UTIL.getAdmin().move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(name.getServerName())); + } + + private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { + assertArrayEquals(row, + Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + assertArrayEquals(cf, + Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); + assertArrayEquals(cq, + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + } + + @Test + public void testPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testReversedPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS-1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testReversedCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testBatchingResultWhenRegionMove() throws IOException { + Table table = + createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setCaching(1); + scan.setBatch(1); + + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + } + + } \ No newline at end of file