HBASE-21206 Scan with batch size may return incomplete cells
This commit is contained in:
parent
cd161d976e
commit
ddd30a2241
|
@ -3282,6 +3282,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
break;
|
||||
}
|
||||
}
|
||||
} else if (!moreRows && !results.isEmpty()) {
|
||||
// No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
|
||||
// last result is false. Otherwise it's possible that: the first nextRaw returned
|
||||
// because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
|
||||
// last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
|
||||
// return with moreRows=false, which means moreResultsInRegion would be false, it will
|
||||
// be a contradictory state (HBASE-21206).
|
||||
int lastIdx = results.size() - 1;
|
||||
Result r = results.get(lastIdx);
|
||||
if (r.mayHaveMoreCellsInRow()) {
|
||||
results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
|
||||
}
|
||||
}
|
||||
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
|
||||
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
|
||||
|
@ -3297,12 +3309,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// there are more values to be read server side. If there aren't more values,
|
||||
// marking it as a heartbeat is wasteful because the client will need to issue
|
||||
// another ScanRequest only to realize that they already have all the values
|
||||
if (moreRows) {
|
||||
if (moreRows && timeLimitReached) {
|
||||
// Heartbeat messages occur when the time limit has been reached.
|
||||
builder.setHeartbeatMessage(timeLimitReached);
|
||||
if (timeLimitReached && rsh.needCursor) {
|
||||
builder.setHeartbeatMessage(true);
|
||||
if (rsh.needCursor) {
|
||||
Cell cursorCell = scannerContext.getLastPeekedCell();
|
||||
if (cursorCell != null ) {
|
||||
if (cursorCell != null) {
|
||||
builder.setCursor(ProtobufUtil.toCursor(cursorCell));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.Random;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -1052,4 +1054,77 @@ public class TestFromClientSide3 {
|
|||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] generateHugeValue(int size) {
|
||||
Random rand = ThreadLocalRandom.current();
|
||||
byte[] value = new byte[size];
|
||||
for (int i = 0; i < value.length; i++) {
|
||||
value[i] = (byte) rand.nextInt(256);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithBatchSizeReturnIncompleteCells() throws IOException {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
|
||||
.build();
|
||||
|
||||
Table table = TEST_UTIL.createTable(hd, null);
|
||||
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
|
||||
table.put(put);
|
||||
|
||||
put = new Put(ROW);
|
||||
put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
|
||||
table.put(put);
|
||||
|
||||
for (int i = 2; i < 5; i++) {
|
||||
for (int version = 0; version < 2; version++) {
|
||||
put = new Put(ROW);
|
||||
put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
Scan scan = new Scan();
|
||||
scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(3)
|
||||
.setMaxResultSize(4 * 1024 * 1024);
|
||||
Result result;
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
List<Result> list = new ArrayList<>();
|
||||
/*
|
||||
* The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
|
||||
* scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
|
||||
* mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
|
||||
* moreResultsInRegion also would be false. Finally, the client should collect all the cells
|
||||
* into two result: 2+3 -> 3+2;
|
||||
*/
|
||||
while ((result = scanner.next()) != null) {
|
||||
list.add(result);
|
||||
}
|
||||
|
||||
Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
|
||||
Assert.assertEquals(2, list.size());
|
||||
Assert.assertEquals(3, list.get(0).size());
|
||||
Assert.assertEquals(2, list.get(1).size());
|
||||
}
|
||||
|
||||
scan = new Scan();
|
||||
scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(2)
|
||||
.setMaxResultSize(4 * 1024 * 1024);
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
List<Result> list = new ArrayList<>();
|
||||
while ((result = scanner.next()) != null) {
|
||||
list.add(result);
|
||||
}
|
||||
Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
|
||||
Assert.assertEquals(3, list.size());
|
||||
Assert.assertEquals(2, list.get(0).size());
|
||||
Assert.assertEquals(2, list.get(1).size());
|
||||
Assert.assertEquals(1, list.get(2).size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue