From bf62e30c183328fc72be23bf3921af3117cbe7df Mon Sep 17 00:00:00 2001 From: openinx Date: Wed, 19 Sep 2018 23:02:07 +0800 Subject: [PATCH] HBASE-21206 Scan with batch size may return incomplete cells --- .../hbase/regionserver/RSRpcServices.java | 18 ++++- .../hbase/client/TestFromClientSide3.java | 73 +++++++++++++++++++ 2 files changed, 88 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7824084c5ac..8b36b905716 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2916,6 +2916,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, break; } } + } else if (!moreRows && !results.isEmpty()) { + // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of + // last result is false. Otherwise it's possible that: the first nextRaw returned + // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the + // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will + // return with moreRows=false, which means moreResultsInRegion would be false, it will + // be a contradictory state (HBASE-21206). + int lastIdx = results.size() - 1; + Result r = results.get(lastIdx); + if (r.mayHaveMoreCellsInRow()) { + results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); + } } boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); @@ -2931,10 +2943,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // there are more values to be read server side. If there aren't more values, // marking it as a heartbeat is wasteful because the client will need to issue // another ScanRequest only to realize that they already have all the values - if (moreRows) { + if (moreRows && timeLimitReached) { // Heartbeat messages occur when the time limit has been reached. - builder.setHeartbeatMessage(timeLimitReached); - if (timeLimitReached && rsh.needCursor) { + builder.setHeartbeatMessage(true); + if (rsh.needCursor) { Cell cursorCell = scannerContext.getLastPeekedCell(); if (cursorCell != null) { builder.setCursor(ProtobufUtil.toCursor(cursorCell)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 515c989c274..560f5528850 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -32,6 +32,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import static junit.framework.Assert.assertFalse; import org.apache.commons.logging.Log; @@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -801,4 +803,75 @@ public class TestFromClientSide3 { } } } + + private static byte[] generateHugeValue(int size) { + Random rand = ThreadLocalRandom.current(); + byte[] value = new byte[size]; + for (int i = 0; i < value.length; i++) { + value[i] = (byte) rand.nextInt(256); + } + return value; + } + + @Test + public void testScanWithBatchSizeReturnIncompleteCells() throws IOException { + TableName tableName = TableName.valueOf("testScanWithBatchSizeReturnIncompleteCells"); + HTableDescriptor hd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(3); + hd.addFamily(hcd); + + Table table = TEST_UTIL.createTable(hd, null); + + Put put = new Put(ROW); + put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); + table.put(put); + + put = new Put(ROW); + put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); + table.put(put); + + for (int i = 2; i < 5; i++) { + for (int version = 0; version < 2; version++) { + put = new Put(ROW); + put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); + table.put(put); + } + } + + Scan scan = new Scan(); + scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(3) + .setMaxResultSize(4 * 1024 * 1024); + Result result; + try (ResultScanner scanner = table.getScanner(scan)) { + List list = new ArrayList<>(); + /* + * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second + * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The + * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the + * moreResultsInRegion also would be false. Finally, the client should collect all the cells + * into two result: 2+3 -> 3+2; + */ + while ((result = scanner.next()) != null) { + list.add(result); + } + + Assert.assertEquals(2, list.size()); + Assert.assertEquals(3, list.get(0).size()); + Assert.assertEquals(2, list.get(1).size()); + } + + scan = new Scan(); + scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(2) + .setMaxResultSize(4 * 1024 * 1024); + try (ResultScanner scanner = table.getScanner(scan)) { + List list = new ArrayList<>(); + while ((result = scanner.next()) != null) { + list.add(result); + } + Assert.assertEquals(3, list.size()); + Assert.assertEquals(2, list.get(0).size()); + Assert.assertEquals(2, list.get(1).size()); + Assert.assertEquals(1, list.get(2).size()); + } + } }