From eaf010cd737dc9b0cd4aac9c800e732d37a1ed8f Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Thu, 15 Nov 2018 14:33:47 +0200 Subject: [PATCH] SQL: clear the cursor if nested inner hits are enough to fulfill the query required limits (#35398) --- .../sql/qa/single_node/JdbcCsvSpecIT.java | 7 ++ .../xpack/sql/qa/jdbc/FetchSizeTestCase.java | 65 ++++++++++++++- .../qa/jdbc/SpecBaseIntegrationTestCase.java | 6 +- .../sql/qa/src/main/resources/dep_emp.csv | 2 + .../sql/qa/src/main/resources/nested.csv-spec | 79 ++++++++++++++++++- .../xpack/sql/execution/search/Querier.java | 11 +-- .../sql/execution/search/SearchHitRowSet.java | 10 +-- 7 files changed, 164 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java index 4f841e02ae3..66ac2e2c7df 100644 --- a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java +++ b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java @@ -12,4 +12,11 @@ public class JdbcCsvSpecIT extends CsvSpecTestCase { public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) { super(fileName, groupName, testName, lineNumber, testCase); } + + @Override + protected int fetchSize() { + // using a smaller fetchSize for nested documents' tests to uncover bugs + // similar with https://github.com/elastic/elasticsearch/issues/35176 quicker + return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize(); + } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java index e0a5bd26db0..fee5901bc4c 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.sql.qa.jdbc; import org.elasticsearch.client.Request; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.junit.Before; import java.io.IOException; @@ -23,12 +26,42 @@ import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearch public class FetchSizeTestCase extends JdbcIntegrationTestCase { @Before public void createTestIndex() throws IOException { - Request request = new Request("PUT", "/test/doc/_bulk"); + Request request = new Request("PUT", "/test"); + XContentBuilder createIndex = JsonXContent.contentBuilder().startObject(); + createIndex.startObject("mappings"); + { + createIndex.startObject("doc"); + { + createIndex.startObject("properties"); + { + createIndex.startObject("nested").field("type", "nested"); + createIndex.startObject("properties"); + createIndex.startObject("inner_field").field("type", "integer").endObject(); + createIndex.endObject(); + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject().endObject(); + request.setJsonEntity(Strings.toString(createIndex)); + client().performRequest(request); + + request = new Request("PUT", "/test/doc/_bulk"); request.addParameter("refresh", "true"); StringBuilder bulk = new StringBuilder(); + StringBuilder bulkLine; for (int i = 0; i < 20; i++) { bulk.append("{\"index\":{}}\n"); - bulk.append("{\"test_field\":" + i + "}\n"); + bulkLine = new StringBuilder("{\"test_field\":" + i); + bulkLine.append(", \"nested\":["); + // each document will have a nested field with 1 - 5 values + for (int j = 0; j <= i % 5; j++) { + bulkLine.append("{\"inner_field\":" + j + "}" + ((j == i % 5) ? "" : ",")); + } + bulkLine.append("]"); + bulk.append(bulkLine).append("}\n"); } request.setJsonEntity(bulk.toString()); client().performRequest(request); @@ -92,4 +125,32 @@ public class FetchSizeTestCase extends JdbcIntegrationTestCase { } } } + + /** + * Test for nested documents. + */ + public void testNestedDocuments() throws Exception { + try (Connection c = esJdbc(); + Statement s = c.createStatement()) { + s.setFetchSize(5); + try (ResultSet rs = s.executeQuery("SELECT test_field, nested.* FROM test ORDER BY test_field ASC")) { + assertTrue("Empty result set!", rs.next()); + for (int i = 0; i < 20; i++) { + assertEquals(15, rs.getFetchSize()); + assertNestedDocuments(rs, i); + } + assertFalse(rs.next()); + } + } + assertNoSearchContexts(); + } + + private void assertNestedDocuments(ResultSet rs, int i) throws SQLException { + for (int j = 0; j <= i % 5; j++) { + assertEquals(i, rs.getInt(1)); + assertEquals(j, rs.getInt(2)); + // don't check the very last row in the result set + assertTrue("No more entries left after row " + rs.getRow(), (i+j == 23 || rs.next())); + } + } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SpecBaseIntegrationTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SpecBaseIntegrationTestCase.java index a7d0332508f..682fb824b70 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SpecBaseIntegrationTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SpecBaseIntegrationTestCase.java @@ -103,10 +103,14 @@ public abstract class SpecBaseIntegrationTestCase extends JdbcIntegrationTestCas protected ResultSet executeJdbcQuery(Connection con, String query) throws SQLException { Statement statement = con.createStatement(); - statement.setFetchSize(between(1, 500)); + statement.setFetchSize(fetchSize()); return statement.executeQuery(query); } + protected int fetchSize() { + return between(1, 500); + } + // TODO: use UTC for now until deciding on a strategy for handling date extraction @Override protected Properties connectionProperties() { diff --git a/x-pack/plugin/sql/qa/src/main/resources/dep_emp.csv b/x-pack/plugin/sql/qa/src/main/resources/dep_emp.csv index ece933b3941..5b669eaf451 100644 --- a/x-pack/plugin/sql/qa/src/main/resources/dep_emp.csv +++ b/x-pack/plugin/sql/qa/src/main/resources/dep_emp.csv @@ -107,5 +107,7 @@ emp_no,dep_id,from_date,to_date 10097,d008,1990-09-15,9999-01-01 10098,d004,1985-05-13,1989-06-29 10098,d009,1989-06-29,1992-12-11 +10098,d008,1992-12-11,1993-05-05 +10098,d007,1993-05-05,1994-02-01 10099,d007,1988-10-18,9999-01-01 10100,d003,1987-09-21,9999-01-01 diff --git a/x-pack/plugin/sql/qa/src/main/resources/nested.csv-spec b/x-pack/plugin/sql/qa/src/main/resources/nested.csv-spec index 89808901e9c..3be0547fd43 100644 --- a/x-pack/plugin/sql/qa/src/main/resources/nested.csv-spec +++ b/x-pack/plugin/sql/qa/src/main/resources/nested.csv-spec @@ -102,7 +102,7 @@ Mayuko | 12 selectWithScalarOnNested SELECT first_name f, last_name l, YEAR(dep.from_date) start FROM test_emp WHERE dep.dep_name = 'Production' AND languages > 1 ORDER BY start LIMIT 5; -f:s | l:s | start:i +f:s | l:s | start:i Sreekrishna |Servieres |1985 Zhongwei |Rosen |1986 @@ -137,7 +137,7 @@ d003 |Azuma d002 |Baek d003 |Baek d004 |Bamford -; +; selectNestedFieldLast SELECT first_name, dep.dep_id FROM test_emp ORDER BY first_name LIMIT 5; @@ -222,3 +222,78 @@ Anneke |d005 |Development |1990-08-05T00:00:00.000Z|9999-01 Anoosh |d005 |Development |1991-08-30T00:00:00.000Z|9999-01-01T00:00:00.000Z|Peyn Arumugam |d008 |Research |1987-04-18T00:00:00.000Z|1997-11-08T00:00:00.000Z|Ossenbruggen ; + +// +// Nested documents tests more targetted for JdbcCsvNestedDocsIT class (with specific fetch_size value) +// + +// employee 10098 has 4 departments + +selectNestedFieldWithFourInnerHitsAndLimitOne +SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 1; + + dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i +---------------+----------------+---------------+--------------- +d004 |Production |Sreekrishna |10098 +; + +selectNestedFieldWithFourInnerHitsAndLimitTwo +SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 2; + + dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i +---------------+----------------+---------------+--------------- +d004 |Production |Sreekrishna |10098 +d009 |Customer Service|Sreekrishna |10098 +; + +selectNestedFieldWithFourInnerHitsAndLimitThree +SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 3; + + dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i +---------------+----------------+---------------+--------------- +d004 |Production |Sreekrishna |10098 +d009 |Customer Service|Sreekrishna |10098 +d008 |Research |Sreekrishna |10098 +; + +selectNestedFieldWithFourInnerHitsAndLimitFour +SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 4; + + dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i +---------------+----------------+---------------+--------------- +d004 |Production |Sreekrishna |10098 +d009 |Customer Service|Sreekrishna |10098 +d008 |Research |Sreekrishna |10098 +d007 |Sales |Sreekrishna |10098 +; + +selectNestedFieldWithFourInnerHitsAndLimitFive +SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 5; + + dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i +---------------+----------------+---------------+--------------- +d004 |Production |Sreekrishna |10098 +d009 |Customer Service|Sreekrishna |10098 +d008 |Research |Sreekrishna |10098 +d007 |Sales |Sreekrishna |10098 +; + +selectNestedFieldFromTwoDocumentsWithFourInnerHitsAndLimitFive +SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 OR emp_no=10099 LIMIT 5; + + dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i +---------------+----------------+---------------+--------------- +d004 |Production |Sreekrishna |10098 +d009 |Customer Service|Sreekrishna |10098 +d008 |Research |Sreekrishna |10098 +d007 |Sales |Sreekrishna |10098 +d007 |Sales |Valter |10099 +; + +selectNestedFieldFromDocumentWithOneInnerHitAndLimitOne +SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10099 LIMIT 1; + + dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i +---------------+----------------+---------------+--------------- +d007 |Sales |Valter |10099 +; \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index f3a397dc68b..df6859cc635 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -318,19 +318,20 @@ public class Querier { // there are some results if (hits.length > 0) { String scrollId = response.getScrollId(); - + SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId); + // if there's an id, try to setup next scroll if (scrollId != null && // is all the content already retrieved? - (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length - // or maybe the limit has been reached - || (hits.length >= query.limit() && query.limit() > -1))) { + (Boolean.TRUE.equals(response.isTerminatedEarly()) + || response.getHits().getTotalHits() == hits.length + || hitRowSet.isLimitReached())) { // if so, clear the scroll clear(response.getScrollId(), ActionListener.wrap( succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), null)), listener::onFailure)); } else { - listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId)); + listener.onResponse(hitRowSet); } } // no hits diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java index e8994bf108f..ba3682df5cc 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java @@ -23,7 +23,6 @@ import java.util.Set; class SearchHitRowSet extends AbstractRowSet { private final SearchHit[] hits; private final Cursor cursor; - private final String scrollId; private final List extractors; private final Set innerHits = new LinkedHashSet<>(); private final String innerHit; @@ -35,7 +34,6 @@ class SearchHitRowSet extends AbstractRowSet { SearchHitRowSet(List exts, SearchHit[] hits, int limit, String scrollId) { this.hits = hits; - this.scrollId = scrollId; this.extractors = exts; // Since the results might contain nested docs, the iteration is similar to that of Aggregation @@ -91,6 +89,10 @@ class SearchHitRowSet extends AbstractRowSet { } } } + + protected boolean isLimitReached() { + return cursor == Cursor.EMPTY; + } @Override public int columnCount() { @@ -166,10 +168,6 @@ class SearchHitRowSet extends AbstractRowSet { return size; } - public String scrollId() { - return scrollId; - } - @Override public Cursor nextPageCursor() { return cursor;