HBASE-21206 Scan with batch size may return incomplete cells

This commit is contained in:
openinx 2018-09-18 16:45:45 +08:00
parent c5af7b654b
commit 5a73a1ab25
2 changed files with 91 additions and 4 deletions

View File

@ -3168,6 +3168,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break;
}
}
} else if (!moreRows && !results.isEmpty()) {
// No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
// last result is false. Otherwise it's possible that: the first nextRaw returned
// because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
// last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
// return with moreRows=false, which means moreResultsInRegion would be false, it will
// be a contradictory state (HBASE-21206).
int lastIdx = results.size() - 1;
Result r = results.get(lastIdx);
if (r.mayHaveMoreCellsInRow()) {
results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
}
}
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
@ -3183,12 +3195,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows) {
if (moreRows && timeLimitReached) {
// Heartbeat messages occur when the time limit has been reached.
builder.setHeartbeatMessage(timeLimitReached);
if (timeLimitReached && rsh.needCursor) {
builder.setHeartbeatMessage(true);
if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
if (cursorCell != null ) {
if (cursorCell != null) {
builder.setCursor(ProtobufUtil.toCursor(cursorCell));
}
}

View File

@ -33,6 +33,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -1052,4 +1054,77 @@ public class TestFromClientSide3 {
return s;
}
}
private static byte[] generateHugeValue(int size) {
Random rand = ThreadLocalRandom.current();
byte[] value = new byte[size];
for (int i = 0; i < value.length; i++) {
value[i] = (byte) rand.nextInt(256);
}
return value;
}
@Test
public void testScanWithBatchSizeReturnIncompleteCells() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
.build();
Table table = TEST_UTIL.createTable(hd, null);
Put put = new Put(ROW);
put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
table.put(put);
put = new Put(ROW);
put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
table.put(put);
for (int i = 2; i < 5; i++) {
for (int version = 0; version < 2; version++) {
put = new Put(ROW);
put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
table.put(put);
}
}
Scan scan = new Scan();
scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(3)
.setMaxResultSize(4 * 1024 * 1024);
Result result;
try (ResultScanner scanner = table.getScanner(scan)) {
List<Result> list = new ArrayList<>();
/*
* The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
* scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
* mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
* moreResultsInRegion also would be false. Finally, the client should collect all the cells
* into two result: 2+3 -> 3+2;
*/
while ((result = scanner.next()) != null) {
list.add(result);
}
Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
Assert.assertEquals(2, list.size());
Assert.assertEquals(3, list.get(0).size());
Assert.assertEquals(2, list.get(1).size());
}
scan = new Scan();
scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(2)
.setMaxResultSize(4 * 1024 * 1024);
try (ResultScanner scanner = table.getScanner(scan)) {
List<Result> list = new ArrayList<>();
while ((result = scanner.next()) != null) {
list.add(result);
}
Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
Assert.assertEquals(3, list.size());
Assert.assertEquals(2, list.get(0).size());
Assert.assertEquals(2, list.get(1).size());
Assert.assertEquals(1, list.get(2).size());
}
}
}