Make SearchCursor limit aware

Original commit: elastic/x-pack-elasticsearch@c4839bc293
This commit is contained in:
Costin Leau 2017-10-10 19:22:42 +03:00
parent ed712d0e3f
commit 1cd6fb23ec
6 changed files with 27 additions and 16 deletions

View File

@ -93,9 +93,8 @@ public class JdbcAssert {
int columns = metaData.getColumnCount(); int columns = metaData.getColumnCount();
long count = 0; long count = 0;
while (expected.next()) { for (count = 0; expected.next(); count++) {
assertTrue("Expected more data but no more entries found after [" + count + "]", actual.next()); assertTrue("Expected more data but no more entries found after [" + count + "]", actual.next());
count++;
if (logger != null) { if (logger != null) {
JdbcTestUtils.logResultSetCurrentData(actual, logger); JdbcTestUtils.logResultSetCurrentData(actual, logger);
@ -122,7 +121,7 @@ public class JdbcAssert {
} }
} }
} }
assertEquals("[" + actual + "] still has data after [" + count + "] entries", expected.next(), actual.next()); assertEquals("Elasticsearch [" + actual + "] still has data after [" + count + "] entries", expected.next(), actual.next());
} }
private static Object getTime(ResultSet rs, int column) throws SQLException { private static Object getTime(ResultSet rs, int column) throws SQLException {

View File

@ -3,6 +3,6 @@
// //
debug debug
SELECT emp_no, CAST(CEIL(emp_no) AS INT) m, first_name FROM "test_emp" WHERE CEIL(emp_no) < 10010 ORDER BY CEIL(emp_no); SELECT CAST(emp_no AS VARCHAR) AS emp_no_cast FROM "test_emp" ORDER BY emp_no LIMIT 5;
//SELECT YEAR(birth_date) AS d, CAST(SUM(emp_no) AS INT) s FROM "test_emp" GROUP BY YEAR(birth_date) ORDER BY YEAR(birth_date) LIMIT 5; //SELECT YEAR(birth_date) AS d, CAST(SUM(emp_no) AS INT) s FROM "test_emp" GROUP BY YEAR(birth_date) ORDER BY YEAR(birth_date) LIMIT 5;
//SELECT emp_no, SIN(emp_no) + emp_no % 10000 + YEAR(hire_date) / 1000 AS s, emp_no AS y FROM test_emp WHERE emp_no = 10010; //SELECT emp_no, SIN(emp_no) + emp_no % 10000 + YEAR(hire_date) / 1000 AS s, emp_no AS y FROM test_emp WHERE emp_no = 10010;

View File

@ -42,21 +42,25 @@ public class ScrollCursor implements Cursor {
private final String scrollId; private final String scrollId;
private final List<HitExtractor> extractors; private final List<HitExtractor> extractors;
private final int limit;
public ScrollCursor(String scrollId, List<HitExtractor> extractors) { public ScrollCursor(String scrollId, List<HitExtractor> extractors, int limit) {
this.scrollId = scrollId; this.scrollId = scrollId;
this.extractors = extractors; this.extractors = extractors;
this.limit = limit;
} }
public ScrollCursor(StreamInput in) throws IOException { public ScrollCursor(StreamInput in) throws IOException {
scrollId = in.readString(); scrollId = in.readString();
extractors = in.readNamedWriteableList(HitExtractor.class); extractors = in.readNamedWriteableList(HitExtractor.class);
limit = in.readVInt();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(scrollId); out.writeString(scrollId);
out.writeNamedWriteableList(extractors); out.writeNamedWriteableList(extractors);
out.writeVInt(limit);
} }
public ScrollCursor(java.io.Reader reader) throws IOException { public ScrollCursor(java.io.Reader reader) throws IOException {
@ -80,6 +84,7 @@ public class ScrollCursor implements Cursor {
} }
})); StreamInput in = new NamedWriteableAwareStreamInput(delegate, REGISTRY)) { })); StreamInput in = new NamedWriteableAwareStreamInput(delegate, REGISTRY)) {
extractors = in.readNamedWriteableList(HitExtractor.class); extractors = in.readNamedWriteableList(HitExtractor.class);
limit = in.readVInt();
} }
} }
@ -94,6 +99,7 @@ public class ScrollCursor implements Cursor {
} }
}))) { }))) {
out.writeNamedWriteableList(extractors); out.writeNamedWriteableList(extractors);
out.writeVInt(limit);
} }
} }
@ -121,7 +127,7 @@ public class ScrollCursor implements Cursor {
*/ */
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeValueSeconds(90)); SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeValueSeconds(90));
client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> {
int limitHits = -1; // NOCOMMIT do a thing with this int limitHits = limit;
listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(), listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(),
limitHits, response.getScrollId())); limitHits, response.getScrollId()));
}, listener::onFailure)); }, listener::onFailure));
@ -134,12 +140,13 @@ public class ScrollCursor implements Cursor {
} }
ScrollCursor other = (ScrollCursor) obj; ScrollCursor other = (ScrollCursor) obj;
return Objects.equals(scrollId, other.scrollId) return Objects.equals(scrollId, other.scrollId)
&& Objects.equals(extractors, other.extractors); && Objects.equals(extractors, other.extractors)
&& Objects.equals(limit, other.limit);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(scrollId, extractors); return Objects.hash(scrollId, extractors, limit);
} }
@Override @Override

View File

@ -240,8 +240,7 @@ public class Scroller {
scrollId = null; scrollId = null;
} }
} }
int limitHits = query.limit() > 0 && hits.length >= query.limit() ? query.limit() : -1; return new SearchHitRowSetCursor(schema, exts, hits, query.limit(), scrollId);
return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId);
} }
// no hits // no hits
else { else {

View File

@ -27,6 +27,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet {
private final List<HitExtractor> extractors; private final List<HitExtractor> extractors;
private final Set<String> innerHits = new LinkedHashSet<>(); private final Set<String> innerHits = new LinkedHashSet<>();
private final String innerHit; private final String innerHit;
private final int limit;
private final int size; private final int size;
private final int[] indexPerLevel; private final int[] indexPerLevel;
@ -74,6 +75,9 @@ public class SearchHitRowSetCursor extends AbstractRowSet {
} }
} }
} }
// overall limit
limit = limitHits;
// page size
size = limitHits < 0 ? sz : Math.min(sz, limitHits); size = limitHits < 0 ? sz : Math.min(sz, limitHits);
indexPerLevel = new int[maxDepth + 1]; indexPerLevel = new int[maxDepth + 1];
this.innerHit = innerHit; this.innerHit = innerHit;
@ -100,12 +104,12 @@ public class SearchHitRowSetCursor extends AbstractRowSet {
@Override @Override
protected boolean doHasCurrent() { protected boolean doHasCurrent() {
return row < size(); return row < size;
} }
@Override @Override
protected boolean doNext() { protected boolean doNext() {
if (row < size() - 1) { if (row < size - 1) {
row++; row++;
// increment last row // increment last row
indexPerLevel[indexPerLevel.length - 1]++; indexPerLevel[indexPerLevel.length - 1]++;
@ -159,10 +163,12 @@ public class SearchHitRowSetCursor extends AbstractRowSet {
* scroll but all results fit in the first page. */ * scroll but all results fit in the first page. */
return Cursor.EMPTY; return Cursor.EMPTY;
} }
if (hits.length == 0) { // compute remaining limit
// NOCOMMIT handle limit int remainingLimit = limit - size;
// if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached
if (size == 0 || remainingLimit == 0) {
return Cursor.EMPTY; return Cursor.EMPTY;
} }
return new ScrollCursor(scrollId, extractors); return new ScrollCursor(scrollId, extractors, remainingLimit);
} }
} }

View File

@ -31,7 +31,7 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCur
for (int i = 0; i < extractorsSize; i++) { for (int i = 0; i < extractorsSize; i++) {
extractors.add(randomHitExtractor(0)); extractors.add(randomHitExtractor(0));
} }
return new ScrollCursor(randomAlphaOfLength(5), extractors); return new ScrollCursor(randomAlphaOfLength(5), extractors, randomIntBetween(10, 1024));
} }
static HitExtractor randomHitExtractor(int depth) { static HitExtractor randomHitExtractor(int depth) {