From 1cd6fb23ec489b7e2e99928abb5293f301cd47a4 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Tue, 10 Oct 2017 19:22:42 +0300 Subject: [PATCH] Make SearchCursor limit aware Original commit: elastic/x-pack-elasticsearch@c4839bc293d896e7c2c596e37a41d97dbe6cb295 --- .../xpack/qa/sql/jdbc/JdbcAssert.java | 5 ++--- qa/sql/src/main/resources/debug.sql-spec | 2 +- .../xpack/sql/execution/search/ScrollCursor.java | 15 +++++++++++---- .../xpack/sql/execution/search/Scroller.java | 3 +-- .../execution/search/SearchHitRowSetCursor.java | 16 +++++++++++----- .../sql/execution/search/ScrollCursorTests.java | 2 +- 6 files changed, 27 insertions(+), 16 deletions(-) diff --git a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcAssert.java b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcAssert.java index 9acb8d215c1..76189865bb0 100644 --- a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcAssert.java +++ b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcAssert.java @@ -93,9 +93,8 @@ public class JdbcAssert { int columns = metaData.getColumnCount(); long count = 0; - while (expected.next()) { + for (count = 0; expected.next(); count++) { assertTrue("Expected more data but no more entries found after [" + count + "]", actual.next()); - count++; if (logger != null) { JdbcTestUtils.logResultSetCurrentData(actual, logger); @@ -122,7 +121,7 @@ public class JdbcAssert { } } } - assertEquals("[" + actual + "] still has data after [" + count + "] entries", expected.next(), actual.next()); + assertEquals("Elasticsearch [" + actual + "] still has data after [" + count + "] entries", expected.next(), actual.next()); } private static Object getTime(ResultSet rs, int column) throws SQLException { diff --git a/qa/sql/src/main/resources/debug.sql-spec b/qa/sql/src/main/resources/debug.sql-spec index 961c6e329bf..a7df4b78267 100644 --- a/qa/sql/src/main/resources/debug.sql-spec +++ b/qa/sql/src/main/resources/debug.sql-spec @@ -3,6 +3,6 @@ // debug -SELECT emp_no, CAST(CEIL(emp_no) AS INT) m, first_name FROM "test_emp" WHERE CEIL(emp_no) < 10010 ORDER BY CEIL(emp_no); +SELECT CAST(emp_no AS VARCHAR) AS emp_no_cast FROM "test_emp" ORDER BY emp_no LIMIT 5; //SELECT YEAR(birth_date) AS d, CAST(SUM(emp_no) AS INT) s FROM "test_emp" GROUP BY YEAR(birth_date) ORDER BY YEAR(birth_date) LIMIT 5; //SELECT emp_no, SIN(emp_no) + emp_no % 10000 + YEAR(hire_date) / 1000 AS s, emp_no AS y FROM test_emp WHERE emp_no = 10010; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index 25c6ea7b1b3..2c240d5fea4 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -42,21 +42,25 @@ public class ScrollCursor implements Cursor { private final String scrollId; private final List extractors; + private final int limit; - public ScrollCursor(String scrollId, List extractors) { + public ScrollCursor(String scrollId, List extractors, int limit) { this.scrollId = scrollId; this.extractors = extractors; + this.limit = limit; } public ScrollCursor(StreamInput in) throws IOException { scrollId = in.readString(); extractors = in.readNamedWriteableList(HitExtractor.class); + limit = in.readVInt(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(scrollId); out.writeNamedWriteableList(extractors); + out.writeVInt(limit); } public ScrollCursor(java.io.Reader reader) throws IOException { @@ -80,6 +84,7 @@ public class ScrollCursor implements Cursor { } })); StreamInput in = new NamedWriteableAwareStreamInput(delegate, REGISTRY)) { extractors = in.readNamedWriteableList(HitExtractor.class); + limit = in.readVInt(); } } @@ -94,6 +99,7 @@ public class ScrollCursor implements Cursor { } }))) { out.writeNamedWriteableList(extractors); + out.writeVInt(limit); } } @@ -121,7 +127,7 @@ public class ScrollCursor implements Cursor { */ SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeValueSeconds(90)); client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { - int limitHits = -1; // NOCOMMIT do a thing with this + int limitHits = limit; listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(), limitHits, response.getScrollId())); }, listener::onFailure)); @@ -134,12 +140,13 @@ public class ScrollCursor implements Cursor { } ScrollCursor other = (ScrollCursor) obj; return Objects.equals(scrollId, other.scrollId) - && Objects.equals(extractors, other.extractors); + && Objects.equals(extractors, other.extractors) + && Objects.equals(limit, other.limit); } @Override public int hashCode() { - return Objects.hash(scrollId, extractors); + return Objects.hash(scrollId, extractors, limit); } @Override diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java index cc70a5d63de..3f5d9341be1 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java @@ -240,8 +240,7 @@ public class Scroller { scrollId = null; } } - int limitHits = query.limit() > 0 && hits.length >= query.limit() ? query.limit() : -1; - return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId); + return new SearchHitRowSetCursor(schema, exts, hits, query.limit(), scrollId); } // no hits else { diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java index 0252918ed64..28f8308a44f 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java @@ -27,6 +27,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet { private final List extractors; private final Set innerHits = new LinkedHashSet<>(); private final String innerHit; + private final int limit; private final int size; private final int[] indexPerLevel; @@ -74,6 +75,9 @@ public class SearchHitRowSetCursor extends AbstractRowSet { } } } + // overall limit + limit = limitHits; + // page size size = limitHits < 0 ? sz : Math.min(sz, limitHits); indexPerLevel = new int[maxDepth + 1]; this.innerHit = innerHit; @@ -100,12 +104,12 @@ public class SearchHitRowSetCursor extends AbstractRowSet { @Override protected boolean doHasCurrent() { - return row < size(); + return row < size; } @Override protected boolean doNext() { - if (row < size() - 1) { + if (row < size - 1) { row++; // increment last row indexPerLevel[indexPerLevel.length - 1]++; @@ -159,10 +163,12 @@ public class SearchHitRowSetCursor extends AbstractRowSet { * scroll but all results fit in the first page. */ return Cursor.EMPTY; } - if (hits.length == 0) { - // NOCOMMIT handle limit + // compute remaining limit + int remainingLimit = limit - size; + // if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached + if (size == 0 || remainingLimit == 0) { return Cursor.EMPTY; } - return new ScrollCursor(scrollId, extractors); + return new ScrollCursor(scrollId, extractors, remainingLimit); } } \ No newline at end of file diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java index 44022ee51da..bb819fc6d34 100644 --- a/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java @@ -31,7 +31,7 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase