HBASE-24267 add limit support for scan in rest api (#1591)
Co-authored-by: zhuqi <zhuqi1@xiaomi.com> Signed-off-by: Michael Stack <stack@apache.org> Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
6eb5cafe34
commit
7e9bd3606f
|
@ -30,4 +30,5 @@ message Scanner {
|
|||
optional int32 caching = 9; // specifies REST scanner caching
|
||||
repeated string labels = 10;
|
||||
optional bool cacheBlocks = 11; // server side block caching hint
|
||||
optional int32 limit = 12;
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ public class ScannerResource extends ResourceBase {
|
|||
String tableName = tableResource.getName();
|
||||
ScannerResultGenerator gen =
|
||||
new ScannerResultGenerator(tableName, spec, filter, model.getCaching(),
|
||||
model.getCacheBlocks());
|
||||
model.getCacheBlocks(), model.getLimit());
|
||||
String id = gen.getID();
|
||||
ScannerInstanceResource instance =
|
||||
new ScannerInstanceResource(tableName, id, gen, model.getBatch());
|
||||
|
|
|
@ -69,6 +69,11 @@ public class ScannerResultGenerator extends ResultGenerator {
|
|||
public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
|
||||
final Filter filter, final int caching, final boolean cacheBlocks)
|
||||
throws IllegalArgumentException, IOException {
|
||||
this(tableName, rowspec, filter, caching, cacheBlocks, -1);
|
||||
}
|
||||
|
||||
public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
|
||||
final Filter filter, final int caching ,final boolean cacheBlocks, int limit) throws IOException {
|
||||
Table table = RESTServlet.getInstance().getTable(tableName);
|
||||
try {
|
||||
Scan scan;
|
||||
|
@ -98,6 +103,9 @@ public class ScannerResultGenerator extends ResultGenerator {
|
|||
if (caching > 0 ) {
|
||||
scan.setCaching(caching);
|
||||
}
|
||||
if (limit > 0) {
|
||||
scan.setLimit(limit);
|
||||
}
|
||||
scan.setCacheBlocks(cacheBlocks);
|
||||
if (rowspec.hasLabels()) {
|
||||
scan.setAuthorizations(new Authorizations(rowspec.getLabels()));
|
||||
|
|
|
@ -118,6 +118,7 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
|
|||
private int caching = -1;
|
||||
private List<String> labels = new ArrayList<>();
|
||||
private boolean cacheBlocks = true;
|
||||
private int limit = -1;
|
||||
|
||||
/**
|
||||
* Implement lazily-instantiated singleton as per recipe
|
||||
|
@ -542,6 +543,9 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
|
|||
if (maxVersions > 0) {
|
||||
model.setMaxVersions(maxVersions);
|
||||
}
|
||||
if (scan.getLimit() > 0) {
|
||||
model.setLimit(scan.getLimit());
|
||||
}
|
||||
Filter filter = scan.getFilter();
|
||||
if (filter != null) {
|
||||
model.setFilter(stringifyFilter(filter));
|
||||
|
@ -686,6 +690,14 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
|
|||
return caching;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the limit specification
|
||||
*/
|
||||
@XmlAttribute
|
||||
public int getLimit() {
|
||||
return limit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if HFile blocks should be cached on the servers for this scan, false otherwise
|
||||
*/
|
||||
|
@ -768,6 +780,13 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
|
|||
this.cacheBlocks = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param limit the number of rows can fetch of each scanner at lifetime
|
||||
*/
|
||||
public void setLimit(int limit) {
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxVersions maximum number of versions to return
|
||||
*/
|
||||
|
@ -818,6 +837,9 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
|
|||
if (caching > 0) {
|
||||
builder.setCaching(caching);
|
||||
}
|
||||
if (limit > 0){
|
||||
builder.setLimit(limit);
|
||||
}
|
||||
builder.setMaxVersions(maxVersions);
|
||||
if (filter != null) {
|
||||
builder.setFilter(filter);
|
||||
|
@ -850,6 +872,9 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
|
|||
if (builder.hasCaching()) {
|
||||
caching = builder.getCaching();
|
||||
}
|
||||
if (builder.hasLimit()) {
|
||||
limit = builder.getLimit();
|
||||
}
|
||||
if (builder.hasStartTime()) {
|
||||
startTime = builder.getStartTime();
|
||||
}
|
||||
|
|
|
@ -580,6 +580,49 @@ public class TestRemoteTable {
|
|||
assertTrue(response.hasBody());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests scanner with limitation
|
||||
* limit the number of rows each scanner scan fetch at life time
|
||||
* The number of rows returned should be equal to the limit
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testLimitedScan() throws Exception {
|
||||
int numTrials = 100;
|
||||
int limit = 60;
|
||||
|
||||
// Truncate the test table for inserting test scenarios rows keys
|
||||
TEST_UTIL.getAdmin().disableTable(TABLE);
|
||||
TEST_UTIL.getAdmin().truncateTable(TABLE, false);
|
||||
String row = "testrow";
|
||||
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) {
|
||||
List<Put> puts = new ArrayList<>();
|
||||
Put put = null;
|
||||
for (int i = 1; i <= numTrials; i++) {
|
||||
put = new Put(Bytes.toBytes(row + i));
|
||||
put.addColumn(COLUMN_1, QUALIFIER_1, TS_2, Bytes.toBytes("testvalue" + i));
|
||||
puts.add(put);
|
||||
}
|
||||
table.put(puts);
|
||||
}
|
||||
|
||||
remoteTable =
|
||||
new RemoteHTable(new Client(new Cluster().add("localhost", REST_TEST_UTIL.getServletPort())),
|
||||
TEST_UTIL.getConfiguration(), TABLE.toBytes());
|
||||
|
||||
Scan scan = new Scan();
|
||||
scan.setLimit(limit);
|
||||
ResultScanner scanner = remoteTable.getScanner(scan);
|
||||
Iterator<Result> resultIterator = scanner.iterator();
|
||||
int counter = 0;
|
||||
while (resultIterator.hasNext()) {
|
||||
resultIterator.next();
|
||||
counter++;
|
||||
}
|
||||
assertEquals(limit, counter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests keeping a HBase scanner alive for long periods of time. Each call to next() should reset
|
||||
* the ConnectionCache timeout for the scanner's connection.
|
||||
|
|
|
@ -47,27 +47,27 @@ public class TestScannerModel extends TestModelBase<ScannerModel> {
|
|||
private static final long START_TIME = 1245219839331L;
|
||||
private static final long END_TIME = 1245393318192L;
|
||||
private static final int CACHING = 1000;
|
||||
private static final int LIMIT = 10000;
|
||||
private static final int BATCH = 100;
|
||||
private static final boolean CACHE_BLOCKS = false;
|
||||
|
||||
public TestScannerModel() throws Exception {
|
||||
super(ScannerModel.class);
|
||||
|
||||
AS_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>"
|
||||
+ "<Scanner batch=\"100\" cacheBlocks=\"false\" caching=\"1000\" endRow=\"enp5eng=\" "
|
||||
+ "endTime=\"1245393318192\" maxVersions=\"2147483647\" startRow=\"YWJyYWNhZGFicmE=\" "
|
||||
+ "startTime=\"1245219839331\">"
|
||||
+ "<Scanner batch=\"100\" cacheBlocks=\"false\" caching=\"1000\" endRow=\"enp5eng=\" endTime=\"1245393318192\""
|
||||
+ " limit=\"10000\" maxVersions=\"2147483647\" startRow=\"YWJyYWNhZGFicmE=\" startTime=\"1245219839331\">"
|
||||
+ "<column>Y29sdW1uMQ==</column> <column>Y29sdW1uMjpmb28=</column>"
|
||||
+ "<labels>private</labels><labels>public</labels>"
|
||||
+ "</Scanner>";
|
||||
+ "<labels>private</labels> <labels>public</labels></Scanner>";
|
||||
|
||||
AS_JSON = "{\"batch\":100,\"caching\":1000,\"cacheBlocks\":false,\"endRow\":\"enp5eng=\","
|
||||
+ "\"endTime\":1245393318192,\"maxVersions\":2147483647,\"startRow\":\"YWJyYWNhZGFicmE=\","
|
||||
+ "\"startTime\":1245219839331,\"column\":[\"Y29sdW1uMQ==\",\"Y29sdW1uMjpmb28=\"],"
|
||||
+"\"labels\":[\"private\",\"public\"]"
|
||||
+"}";
|
||||
+"\"labels\":[\"private\",\"public\"],"
|
||||
+"\"limit\":10000}";
|
||||
|
||||
AS_PB = "CgthYnJhY2FkYWJyYRIFenp5engaB2NvbHVtbjEaC2NvbHVtbjI6Zm9vIGQo47qL554kMLDi57mf"
|
||||
+ "JDj/////B0joB1IHcHJpdmF0ZVIGcHVibGljWAA=";
|
||||
AS_PB = "CgthYnJhY2FkYWJyYRIFenp5engaB2NvbHVtbjEaC2NvbHVtbjI6Zm9vIGQo47qL554kMLDi57mfJDj"
|
||||
+"/////B0joB1IHcHJpdmF0ZVIGcHVibGljWABgkE4=";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,6 +84,7 @@ public class TestScannerModel extends TestModelBase<ScannerModel> {
|
|||
model.addLabel(PRIVATE);
|
||||
model.addLabel(PUBLIC);
|
||||
model.setCacheBlocks(CACHE_BLOCKS);
|
||||
model.setLimit(LIMIT);
|
||||
return model;
|
||||
}
|
||||
|
||||
|
@ -104,6 +105,7 @@ public class TestScannerModel extends TestModelBase<ScannerModel> {
|
|||
assertEquals(START_TIME, model.getStartTime());
|
||||
assertEquals(END_TIME, model.getEndTime());
|
||||
assertEquals(BATCH, model.getBatch());
|
||||
assertEquals(LIMIT, model.getLimit());
|
||||
assertEquals(CACHING, model.getCaching());
|
||||
assertEquals(CACHE_BLOCKS, model.getCacheBlocks());
|
||||
boolean foundLabel1 = false;
|
||||
|
|
Loading…
Reference in New Issue