diff --git a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java index 428bc1c21ef..685a36f483b 100644 --- a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java +++ b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java @@ -32,7 +32,7 @@ public class JdbcCsvSpecIT extends CsvSpecTestCase { @Override protected int fetchSize() { // using a smaller fetchSize for nested documents' tests to uncover bugs - // similar with https://github.com/elastic/elasticsearch/issues/35176 quicker + // similar to https://github.com/elastic/elasticsearch/issues/35176 quicker return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize(); } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java index 5e3b034d757..c5ae7f63ad0 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java @@ -21,4 +21,5 @@ public interface ErrorsTestCase { void testSelectGroupByScore() throws Exception; void testSelectScoreSubField() throws Exception; void testSelectScoreInScalar() throws Exception; + void testHardLimitForSortOnAggregate() throws Exception; } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java index e7a73cd12d5..cf221bbc140 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java @@ -56,8 +56,8 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase { return null; } - protected void index(String index, CheckedConsumer body) throws IOException { - Request request = new Request("PUT", "/" + index + "/_doc/1"); + protected void index(String index, int docId, CheckedConsumer body) throws IOException { + Request request = new Request("PUT", "/" + index + "/_doc/" + docId); request.addParameter("refresh", "true"); XContentBuilder builder = JsonXContent.contentBuilder().startObject(); body.accept(builder); @@ -66,6 +66,10 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase { client().performRequest(request); } + protected void index(String index, CheckedConsumer body) throws IOException { + index(index, 1, body); + } + public String command(String command) throws IOException { return cli.command(command); } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java index ca251a31844..a3ad325d0ac 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java @@ -97,8 +97,15 @@ public abstract class ErrorsTestCase extends CliIntegrationTestCase implements o assertEquals("line 1:12: [SCORE()] cannot be an argument to a function" + END, readLine()); } + @Override + public void testHardLimitForSortOnAggregate() throws Exception { + index("test", body -> body.field("a", 1).field("b", 2)); + String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"); + assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END, + commandResult); + } + public static void assertFoundOneProblem(String commandResult) { assertEquals(START + "Bad request [[3;33;22mFound 1 problem(s)", commandResult); } - } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java index 84f74bcbac1..02de2dff4f7 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java @@ -49,4 +49,32 @@ public abstract class FetchSizeTestCase extends CliIntegrationTestCase { assertEquals(ErrorsTestCase.START + "Invalid fetch size [[3;33;22m" + Long.MAX_VALUE + ErrorsTestCase.END, command("fetch size = " + Long.MAX_VALUE)); } + + // Test for issue: https://github.com/elastic/elasticsearch/issues/42851 + // Even though fetch size and limit are smaller than the noRows, all buckets + // should be processed to achieve the global ordering of the aggregate function. + public void testOrderingOnAggregate() throws IOException { + Request request = new Request("PUT", "/test/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + for (int i = 1; i <= 100; i++) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"a\":").append(i).append(", \"b\" : ").append(i).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + client().performRequest(request); + + assertEquals("[?1l>[?1000l[?2004lfetch size set to [90m4[0m", command("fetch size = 4")); + assertEquals("[?1l>[?1000l[?2004lfetch separator set to \"[90m -- fetch sep -- [0m\"", + command("fetch separator = \" -- fetch sep -- \"")); + assertThat(command("SELECT max(b) FROM test GROUP BY a ORDER BY max(b) DESC LIMIT 20"), containsString("max(b)")); + assertThat(readLine(), containsString("----------")); + for (int i = 100; i > 80; i--) { + if (i < 100 && i % 4 == 0) { + assertThat(readLine(), containsString(" -- fetch sep -- ")); + } + assertThat(readLine(), containsString(Integer.toString(i))); + } + assertEquals("", readLine()); + } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DebugCsvSpec.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DebugCsvSpec.java deleted file mode 100644 index d5a633e5ea3..00000000000 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DebugCsvSpec.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.qa.jdbc; - -import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.CsvTestCase; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; - -import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.csvConnection; -import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.executeCsvQuery; -import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.specParser; - -@TestLogging("org.elasticsearch.xpack.sql:TRACE") -public abstract class DebugCsvSpec extends SpecBaseIntegrationTestCase { - private final CsvTestCase testCase; - - @ParametersFactory(shuffle = false, argumentFormatting = SqlSpecTestCase.PARAM_FORMATTING) - public static List readScriptSpec() throws Exception { - Parser parser = specParser(); - return readScriptSpec("/debug.csv-spec", parser); - } - - public DebugCsvSpec(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) { - super(fileName, groupName, testName, lineNumber); - this.testCase = testCase; - } - - @Override - protected void assertResults(ResultSet expected, ResultSet elastic) throws SQLException { - Logger log = logEsResultSet() ? logger : null; - - // - // uncomment this to printout the result set and create new CSV tests - // - JdbcTestUtils.logResultSetMetadata(elastic, log); - JdbcTestUtils.logResultSetData(elastic, log); - //JdbcAssert.assertResultSets(expected, elastic, log); - } - - @Override - protected boolean logEsResultSet() { - return true; - } - - @Override - protected final void doTest() throws Throwable { - try (Connection csv = csvConnection(testCase); Connection es = esJdbc()) { - // pass the testName as table for debugging purposes (in case the underlying reader is missing) - ResultSet expected = executeCsvQuery(csv, testName); - ResultSet elasticResults = executeJdbcQuery(es, testCase.query); - assertResults(expected, elasticResults); - } - } -} \ No newline at end of file diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java index be3ba3d096a..6f12963634f 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java @@ -116,4 +116,14 @@ public class ErrorsTestCase extends JdbcIntegrationTestCase implements org.elast assertThat(e.getMessage(), startsWith("Found 1 problem(s)\nline 1:12: [SCORE()] cannot be an argument to a function")); } } + + @Override + public void testHardLimitForSortOnAggregate() throws Exception { + index("test", body -> body.field("a", 1).field("b", 2)); + try (Connection c = esJdbc()) { + SQLException e = expectThrows(SQLException.class, () -> + c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000").executeQuery()); + assertEquals("The maximum LIMIT for aggregate sorting is [512], received [10000]", e.getMessage()); + } + } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java index ef01dc1fca1..cfbec77a3e6 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java @@ -65,6 +65,13 @@ public abstract class SqlSpecTestCase extends SpecBaseIntegrationTestCase { this.query = query; } + @Override + protected int fetchSize() { + // using a smaller fetchSize for nested documents' tests to uncover bugs + // similar to https://github.com/elastic/elasticsearch/issues/42581 + return randomIntBetween(1, 20); + } + @Override protected final void doTest() throws Throwable { // we skip the tests in case of these locales because ES-SQL is Locale-insensitive for now diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index c88f31bb2fd..5a16261bfbb 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.qa.rest; import com.fasterxml.jackson.core.io.JsonStringEncoder; - import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -314,7 +313,14 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe expectBadRequest(() -> runSql(randomMode(), "SELECT SIN(SCORE()) FROM test"), containsString("line 1:12: [SCORE()] cannot be an argument to a function")); } - + + @Override + public void testHardLimitForSortOnAggregate() throws Exception { + index("{\"a\": 1, \"b\": 2}"); + expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"), + containsString("The maximum LIMIT for aggregate sorting is [512], received [10000]")); + } + public void testUseColumnarForUnsupportedFormats() throws Exception { String format = randomFrom("txt", "csv", "tsv"); index("{\"foo\":1}"); diff --git a/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec b/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec index 79d58c48e44..9a193d76b31 100644 --- a/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec +++ b/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec @@ -99,13 +99,13 @@ aggNotSpecifiedWithHavingOnLargeGroupBy SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary); aggWithTieBreakerDescAsc -SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC; +SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC LIMIT 50; aggWithTieBreakerDescDesc -SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC; +SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC LIMIT 50; aggWithTieBreakerAscDesc -SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC; +SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC LIMIT 50; aggWithMixOfOrdinals SELECT gender AS g, MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY 2 DESC LIMIT 3; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index 34fab72ca13..ee0bb216e6f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -143,9 +143,9 @@ public class CompositeAggregationCursor implements Cursor { return; } - updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, serializeQuery(query), includeFrozen, - indices); + boolean hasAfterKey = updateCompositeAfterKey(r, query); + CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, + hasAfterKey ? serializeQuery(query) : null, includeFrozen, indices); listener.onResponse(rowSet); } catch (Exception ex) { listener.onFailure(ex); @@ -178,7 +178,7 @@ public class CompositeAggregationCursor implements Cursor { throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass()); } - static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { + static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { CompositeAggregation composite = getComposite(r); if (composite == null) { @@ -187,22 +187,25 @@ public class CompositeAggregationCursor implements Cursor { Map afterKey = composite.afterKey(); // a null after-key means done - if (afterKey != null) { - AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next(); - // update after-key with the new value - if (aggBuilder instanceof CompositeAggregationBuilder) { - CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder; - comp.aggregateAfter(afterKey); - } else { - throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder); - } + if (afterKey == null) { + return false; + } + + AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next(); + // update after-key with the new value + if (aggBuilder instanceof CompositeAggregationBuilder) { + CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder; + comp.aggregateAfter(afterKey); + return true; + } else { + throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder); } } /** * Deserializes the search source from a byte array. */ - static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException { + private static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException { try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(source), registry)) { return new SearchSourceBuilder(in); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java index f93e00eac5a..88b93359d0f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java @@ -28,8 +28,8 @@ class CompositeAggsRowSet extends ResultRowSet { private final int size; private int row = 0; - CompositeAggsRowSet(List exts, BitSet mask, SearchResponse response, int limit, byte[] next, - boolean includeFrozen, String... indices) { + CompositeAggsRowSet(List exts, BitSet mask, SearchResponse response, + int limit, byte[] next, boolean includeFrozen, String... indices) { super(exts, mask); CompositeAggregation composite = CompositeAggregationCursor.getComposite(response); @@ -40,18 +40,22 @@ class CompositeAggsRowSet extends ResultRowSet { } // page size - size = limit < 0 ? buckets.size() : Math.min(buckets.size(), limit); + size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit); if (next == null) { cursor = Cursor.EMPTY; } else { - // compute remaining limit - int remainingLimit = limit - size; + // Compute remaining limit + + // If the limit is -1 then we have a local sorting (sort on aggregate function) that requires all the buckets + // to be processed so we stop only when all data is exhausted. + int remainingLimit = (limit == -1) ? limit : ((limit - size) >= 0 ? (limit - size) : 0); + // if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached // note that a composite agg might be valid but return zero groups (since these can be filtered with HAVING/bucket selector) // however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response - // is returned - if (next == null || size == 0 || remainingLimit == 0) { + // is returned. + if (size == 0 || remainingLimit == 0) { cursor = Cursor.EMPTY; } else { cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, includeFrozen, indices); @@ -92,4 +96,4 @@ class CompositeAggsRowSet extends ResultRowSet { public Cursor nextPageCursor() { return cursor; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index 1a47d43a6c2..78979e112f0 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -117,7 +117,6 @@ public class Querier { listener = sortingColumns.isEmpty() ? listener : new LocalAggregationSorterListener(listener, sortingColumns, query.limit()); ActionListener l = null; - if (query.isAggsOnly()) { if (query.aggs().useImplicitGroupBy()) { l = new ImplicitGroupActionListener(listener, client, cfg, output, query, search); @@ -134,7 +133,7 @@ public class Querier { public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen, String... indices) { - SearchRequest search = client.prepareSearch(indices) + return client.prepareSearch(indices) // always track total hits accurately .setTrackTotalHits(true) .setAllowPartialSearchResults(false) @@ -143,7 +142,6 @@ public class Querier { .setIndicesOptions( includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS) .request(); - return search; } /** @@ -158,7 +156,7 @@ public class Querier { private final ActionListener listener; // keep the top N entries. - private final PriorityQueue, Integer>> data; + private final AggSortingQueue data; private final AtomicInteger counter = new AtomicInteger(); private volatile Schema schema; @@ -174,53 +172,12 @@ public class Querier { } else { noLimit = false; if (limit > MAXIMUM_SIZE) { - throw new PlanningException("The maximum LIMIT for aggregate sorting is [{}], received [{}]", limit, MAXIMUM_SIZE); + throw new PlanningException("The maximum LIMIT for aggregate sorting is [{}], received [{}]", MAXIMUM_SIZE, limit); } else { size = limit; } } - - this.data = new PriorityQueue, Integer>>(size) { - - // compare row based on the received attribute sort - // if a sort item is not in the list, it is assumed the sorting happened in ES - // and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria. - // - // Take for example ORDER BY a, x, b, y - // a, b - are sorted in ES - // x, y - need to be sorted client-side - // sorting on x kicks in, only if the values for a are equal. - - // thanks to @jpountz for the row ordering idea as a way to preserve ordering - @SuppressWarnings("unchecked") - @Override - protected boolean lessThan(Tuple, Integer> l, Tuple, Integer> r) { - for (Tuple tuple : sortingColumns) { - int i = tuple.v1().intValue(); - Comparator comparator = tuple.v2(); - - Object vl = l.v1().get(i); - Object vr = r.v1().get(i); - if (comparator != null) { - int result = comparator.compare(vl, vr); - // if things are equals, move to the next comparator - if (result != 0) { - return result < 0; - } - } - // no comparator means the existing order needs to be preserved - else { - // check the values - if they are equal move to the next comparator - // otherwise return the row order - if (Objects.equals(vl, vr) == false) { - return l.v2().compareTo(r.v2()) < 0; - } - } - } - // everything is equal, fall-back to the row order - return l.v2().compareTo(r.v2()) < 0; - } - }; + this.data = new AggSortingQueue(size, sortingColumns); } @Override @@ -231,9 +188,8 @@ public class Querier { private void doResponse(RowSet rowSet) { // 1. consume all pages received - if (consumeRowSet(rowSet) == false) { - return; - } + consumeRowSet(rowSet); + Cursor cursor = rowSet.nextPageCursor(); // 1a. trigger a next call if there's still data if (cursor != Cursor.EMPTY) { @@ -248,31 +204,21 @@ public class Querier { sendResponse(); } - private boolean consumeRowSet(RowSet rowSet) { - // use a synchronized block for visibility purposes (there's no concurrency) + private void consumeRowSet(RowSet rowSet) { ResultRowSet rrs = (ResultRowSet) rowSet; - synchronized (data) { - for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) { - List row = new ArrayList<>(rrs.columnCount()); - rrs.forEachResultColumn(row::add); - // if the queue overflows and no limit was specified, bail out - if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) { - onFailure(new SqlIllegalArgumentException( - "The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT")); - return false; - } + for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) { + List row = new ArrayList<>(rrs.columnCount()); + rrs.forEachResultColumn(row::add); + // if the queue overflows and no limit was specified, throw an error + if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) { + onFailure(new SqlIllegalArgumentException( + "The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT", MAXIMUM_SIZE)); } } - return true; } private void sendResponse() { - List> list = new ArrayList<>(data.size()); - Tuple, Integer> pop = null; - while ((pop = data.pop()) != null) { - list.add(pop.v1()); - } - listener.onResponse(new PagingListRowSet(schema, list, schema.size(), cfg.pageSize())); + listener.onResponse(new PagingListRowSet(schema, data.asList(), schema.size(), cfg.pageSize())); } @Override @@ -373,7 +319,7 @@ public class Querier { @Override protected void handleResponse(SearchResponse response, ActionListener listener) { // there are some results - if (response.getAggregations().asList().size() > 0) { + if (response.getAggregations().asList().isEmpty() == false) { // retry if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { @@ -383,7 +329,7 @@ public class Querier { } CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); - byte[] nextSearch = null; + byte[] nextSearch; try { nextSearch = CompositeAggregationCursor.serializeQuery(request.source()); } catch (Exception ex) { @@ -392,7 +338,8 @@ public class Querier { } listener.onResponse( - new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response, query.limit(), + new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response, + query.sortingColumns().isEmpty() ? query.limit() : -1, nextSearch, query.shouldIncludeFrozen(), request.indices())); @@ -626,4 +573,63 @@ public class Querier { listener.onFailure(ex); } } + + @SuppressWarnings("rawtypes") + static class AggSortingQueue extends PriorityQueue, Integer>> { + + private List> sortingColumns; + + AggSortingQueue(int maxSize, List> sortingColumns) { + super(maxSize); + this.sortingColumns = sortingColumns; + } + + // compare row based on the received attribute sort + // if a sort item is not in the list, it is assumed the sorting happened in ES + // and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria. + // + // Take for example ORDER BY a, x, b, y + // a, b - are sorted in ES + // x, y - need to be sorted client-side + // sorting on x kicks in, only if the values for a are equal. + + // thanks to @jpountz for the row ordering idea as a way to preserve ordering + @SuppressWarnings("unchecked") + @Override + protected boolean lessThan(Tuple, Integer> l, Tuple, Integer> r) { + for (Tuple tuple : sortingColumns) { + int i = tuple.v1().intValue(); + Comparator comparator = tuple.v2(); + + Object vl = l.v1().get(i); + Object vr = r.v1().get(i); + if (comparator != null) { + int result = comparator.compare(vl, vr); + // if things are equals, move to the next comparator + if (result != 0) { + return result > 0; + } + } + // no comparator means the existing order needs to be preserved + else { + // check the values - if they are equal move to the next comparator + // otherwise return the row order + if (Objects.equals(vl, vr) == false) { + return l.v2().compareTo(r.v2()) > 0; + } + } + } + // everything is equal, fall-back to the row order + return l.v2().compareTo(r.v2()) > 0; + } + + List> asList() { + List> list = new ArrayList<>(super.size()); + Tuple, Integer> pop; + while ((pop = pop()) != null) { + list.add(0, pop.v1()); + } + return list; + } + } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java index 64177aeab68..3ec4ff6b114 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java @@ -34,4 +34,4 @@ class SchemaCompositeAggsRowSet extends CompositeAggsRowSet implements SchemaRow public Schema schema() { return schema; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java index 8d9e59617aa..4e343c1e54f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java @@ -80,9 +80,13 @@ public abstract class SourceGenerator { if (source.size() == -1) { source.size(sz); } - // limit the composite aggs only for non-local sorting - if (aggBuilder instanceof CompositeAggregationBuilder && container.sortingColumns().isEmpty()) { - ((CompositeAggregationBuilder) aggBuilder).size(sz); + if (aggBuilder instanceof CompositeAggregationBuilder) { + // limit the composite aggs only for non-local sorting + if (container.sortingColumns().isEmpty()) { + ((CompositeAggregationBuilder) aggBuilder).size(sz); + } else { + ((CompositeAggregationBuilder) aggBuilder).size(size); + } } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java index 811a8ff4256..0baf470e4b3 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java @@ -46,7 +46,8 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest return new CompositeAggregationCursor(instance.next(), instance.extractors(), randomValueOtherThan(instance.mask(), () -> randomBitSet(instance.extractors().size())), randomValueOtherThan(instance.limit(), () -> randomIntBetween(1, 512)), - randomBoolean(), instance.indices()); + !instance.includeFrozen(), + instance.indices()); } @Override @@ -81,4 +82,4 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest } return mask; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/QuerierTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/QuerierTests.java new file mode 100644 index 00000000000..a6caad899dd --- /dev/null +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/QuerierTests.java @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.execution.search.Querier.AggSortingQueue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class QuerierTests extends ESTestCase { + + @SuppressWarnings("rawtypes") + public void testAggSortingAscending() { + Tuple tuple = new Tuple<>(0, Comparator.naturalOrder()); + Querier.AggSortingQueue queue = new AggSortingQueue(10, Collections.singletonList(tuple)); + for (int i = 50; i >= 0; i--) { + queue.insertWithOverflow(new Tuple<>(Collections.singletonList(i), i)); + } + List> results = queue.asList(); + + assertEquals(10, results.size()); + for (int i = 0; i < 10; i ++) { + assertEquals(i, results.get(i).get(0)); + } + } + + @SuppressWarnings("rawtypes") + public void testAggSortingDescending() { + Tuple tuple = new Tuple<>(0, Comparator.reverseOrder()); + Querier.AggSortingQueue queue = new AggSortingQueue(10, Collections.singletonList(tuple)); + for (int i = 0; i <= 50; i++) { + queue.insertWithOverflow(new Tuple<>(Collections.singletonList(i), i)); + } + List> results = queue.asList(); + + assertEquals(10, results.size()); + for (int i = 0; i < 10; i ++) { + assertEquals(50 - i, results.get(i).get(0)); + } + } + + @SuppressWarnings("rawtypes") + public void testAggSorting_TwoFields() { + List> tuples = new ArrayList<>(2); + tuples.add(new Tuple<>(0, Comparator.reverseOrder())); + tuples.add(new Tuple<>(1, Comparator.naturalOrder())); + Querier.AggSortingQueue queue = new AggSortingQueue(10, tuples); + + for (int i = 1; i <= 100; i++) { + queue.insertWithOverflow(new Tuple<>(Arrays.asList(i % 50 + 1, i), i)); + } + List> results = queue.asList(); + + assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + assertEquals(50 - (i / 2), results.get(i).get(0)); + assertEquals(49 - (i / 2) + ((i % 2) * 50), results.get(i).get(1)); + } + } + + @SuppressWarnings("rawtypes") + public void testAggSorting_Randomized() { + // Initialize comparators for fields (columns) + int noColumns = randomIntBetween(3, 10); + List> tuples = new ArrayList<>(noColumns); + boolean[] ordering = new boolean[noColumns]; + for (int j = 0; j < noColumns; j++) { + boolean order = randomBoolean(); + ordering[j] = order; + tuples.add(new Tuple<>(j, order ? Comparator.naturalOrder() : Comparator.reverseOrder())); + } + + // Insert random no of documents (rows) with random 0/1 values for each field + int noDocs = randomIntBetween(10, 50); + int queueSize = randomIntBetween(4, noDocs / 2); + List> expected = new ArrayList<>(noDocs); + Querier.AggSortingQueue queue = new AggSortingQueue(queueSize, tuples); + for (int i = 0; i < noDocs; i++) { + List values = new ArrayList<>(noColumns); + for (int j = 0; j < noColumns; j++) { + values.add(randomBoolean() ? 1 : 0); + } + queue.insertWithOverflow(new Tuple<>(values, i)); + expected.add(values); + } + + List> results = queue.asList(); + assertEquals(queueSize, results.size()); + expected.sort((o1, o2) -> { + for (int j = 0; j < noColumns; j++) { + if (o1.get(j) < o2.get(j)) { + return ordering[j] ? -1 : 1; + } else if (o1.get(j) > o2.get(j)) { + return ordering[j] ? 1 : -1; + } + } + return 0; + }); + assertEquals(expected.subList(0, queueSize), results); + } +}