SQL: Fix wrong results when sorting on aggregate (#43154)

- Previously, when shorting on an aggregate function the bucket
processing ended early when the explicit (LIMIT XXX) or the impliciti
limit of 512 was reached. As a consequence, only a set of grouping
buckets was processed and the results returned didn't reflect the global
ordering.

- Previously, the priority queue shorting method had an inverse
comparison check and the final response from the priority queue was also
returned in the inversed order because of the calls to the `pop()`
method.

Fixes: #42851

(cherry picked from commit 19909edcfdf5792b38c1363b07379783ebd0e6c4)
This commit is contained in:
Marios Trivyzas 2019-06-13 21:21:32 +02:00
parent 55dba6ffad
commit 3c73602524
17 changed files with 299 additions and 173 deletions

View File

@ -32,7 +32,7 @@ public class JdbcCsvSpecIT extends CsvSpecTestCase {
@Override
protected int fetchSize() {
// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar with https://github.com/elastic/elasticsearch/issues/35176 quicker
// similar to https://github.com/elastic/elasticsearch/issues/35176 quicker
return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize();
}
}

View File

@ -21,4 +21,5 @@ public interface ErrorsTestCase {
void testSelectGroupByScore() throws Exception;
void testSelectScoreSubField() throws Exception;
void testSelectScoreInScalar() throws Exception;
void testHardLimitForSortOnAggregate() throws Exception;
}

View File

@ -56,8 +56,8 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase {
return null;
}
protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
Request request = new Request("PUT", "/" + index + "/_doc/1");
protected void index(String index, int docId, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
Request request = new Request("PUT", "/" + index + "/_doc/" + docId);
request.addParameter("refresh", "true");
XContentBuilder builder = JsonXContent.contentBuilder().startObject();
body.accept(builder);
@ -66,6 +66,10 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase {
client().performRequest(request);
}
protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
index(index, 1, body);
}
public String command(String command) throws IOException {
return cli.command(command);
}

View File

@ -97,8 +97,15 @@ public abstract class ErrorsTestCase extends CliIntegrationTestCase implements o
assertEquals("line 1:12: [SCORE()] cannot be an argument to a function" + END, readLine());
}
@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("test", body -> body.field("a", 1).field("b", 2));
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000");
assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END,
commandResult);
}
public static void assertFoundOneProblem(String commandResult) {
assertEquals(START + "Bad request [[3;33;22mFound 1 problem(s)", commandResult);
}
}

View File

@ -49,4 +49,32 @@ public abstract class FetchSizeTestCase extends CliIntegrationTestCase {
assertEquals(ErrorsTestCase.START + "Invalid fetch size [[3;33;22m" + Long.MAX_VALUE + ErrorsTestCase.END,
command("fetch size = " + Long.MAX_VALUE));
}
// Test for issue: https://github.com/elastic/elasticsearch/issues/42851
// Even though fetch size and limit are smaller than the noRows, all buckets
// should be processed to achieve the global ordering of the aggregate function.
public void testOrderingOnAggregate() throws IOException {
Request request = new Request("PUT", "/test/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
for (int i = 1; i <= 100; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"a\":").append(i).append(", \"b\" : ").append(i).append("}\n");
}
request.setJsonEntity(bulk.toString());
client().performRequest(request);
assertEquals("[?1l>[?1000l[?2004lfetch size set to [90m4[0m", command("fetch size = 4"));
assertEquals("[?1l>[?1000l[?2004lfetch separator set to \"[90m -- fetch sep -- [0m\"",
command("fetch separator = \" -- fetch sep -- \""));
assertThat(command("SELECT max(b) FROM test GROUP BY a ORDER BY max(b) DESC LIMIT 20"), containsString("max(b)"));
assertThat(readLine(), containsString("----------"));
for (int i = 100; i > 80; i--) {
if (i < 100 && i % 4 == 0) {
assertThat(readLine(), containsString(" -- fetch sep -- "));
}
assertThat(readLine(), containsString(Integer.toString(i)));
}
assertEquals("", readLine());
}
}

View File

@ -1,64 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.qa.jdbc;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.CsvTestCase;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.csvConnection;
import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.executeCsvQuery;
import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.specParser;
@TestLogging("org.elasticsearch.xpack.sql:TRACE")
public abstract class DebugCsvSpec extends SpecBaseIntegrationTestCase {
private final CsvTestCase testCase;
@ParametersFactory(shuffle = false, argumentFormatting = SqlSpecTestCase.PARAM_FORMATTING)
public static List<Object[]> readScriptSpec() throws Exception {
Parser parser = specParser();
return readScriptSpec("/debug.csv-spec", parser);
}
public DebugCsvSpec(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) {
super(fileName, groupName, testName, lineNumber);
this.testCase = testCase;
}
@Override
protected void assertResults(ResultSet expected, ResultSet elastic) throws SQLException {
Logger log = logEsResultSet() ? logger : null;
//
// uncomment this to printout the result set and create new CSV tests
//
JdbcTestUtils.logResultSetMetadata(elastic, log);
JdbcTestUtils.logResultSetData(elastic, log);
//JdbcAssert.assertResultSets(expected, elastic, log);
}
@Override
protected boolean logEsResultSet() {
return true;
}
@Override
protected final void doTest() throws Throwable {
try (Connection csv = csvConnection(testCase); Connection es = esJdbc()) {
// pass the testName as table for debugging purposes (in case the underlying reader is missing)
ResultSet expected = executeCsvQuery(csv, testName);
ResultSet elasticResults = executeJdbcQuery(es, testCase.query);
assertResults(expected, elasticResults);
}
}
}

View File

@ -116,4 +116,14 @@ public class ErrorsTestCase extends JdbcIntegrationTestCase implements org.elast
assertThat(e.getMessage(), startsWith("Found 1 problem(s)\nline 1:12: [SCORE()] cannot be an argument to a function"));
}
}
@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("test", body -> body.field("a", 1).field("b", 2));
try (Connection c = esJdbc()) {
SQLException e = expectThrows(SQLException.class, () ->
c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000").executeQuery());
assertEquals("The maximum LIMIT for aggregate sorting is [512], received [10000]", e.getMessage());
}
}
}

View File

@ -65,6 +65,13 @@ public abstract class SqlSpecTestCase extends SpecBaseIntegrationTestCase {
this.query = query;
}
@Override
protected int fetchSize() {
// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar to https://github.com/elastic/elasticsearch/issues/42581
return randomIntBetween(1, 20);
}
@Override
protected final void doTest() throws Throwable {
// we skip the tests in case of these locales because ES-SQL is Locale-insensitive for now

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.qa.rest;
import com.fasterxml.jackson.core.io.JsonStringEncoder;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
@ -314,7 +313,14 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe
expectBadRequest(() -> runSql(randomMode(), "SELECT SIN(SCORE()) FROM test"),
containsString("line 1:12: [SCORE()] cannot be an argument to a function"));
}
@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("{\"a\": 1, \"b\": 2}");
expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"),
containsString("The maximum LIMIT for aggregate sorting is [512], received [10000]"));
}
public void testUseColumnarForUnsupportedFormats() throws Exception {
String format = randomFrom("txt", "csv", "tsv");
index("{\"foo\":1}");

View File

@ -99,13 +99,13 @@ aggNotSpecifiedWithHavingOnLargeGroupBy
SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary);
aggWithTieBreakerDescAsc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC LIMIT 50;
aggWithTieBreakerDescDesc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC LIMIT 50;
aggWithTieBreakerAscDesc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC LIMIT 50;
aggWithMixOfOrdinals
SELECT gender AS g, MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY 2 DESC LIMIT 3;

View File

@ -143,9 +143,9 @@ public class CompositeAggregationCursor implements Cursor {
return;
}
updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, serializeQuery(query), includeFrozen,
indices);
boolean hasAfterKey = updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit,
hasAfterKey ? serializeQuery(query) : null, includeFrozen, indices);
listener.onResponse(rowSet);
} catch (Exception ex) {
listener.onFailure(ex);
@ -178,7 +178,7 @@ public class CompositeAggregationCursor implements Cursor {
throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass());
}
static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
CompositeAggregation composite = getComposite(r);
if (composite == null) {
@ -187,22 +187,25 @@ public class CompositeAggregationCursor implements Cursor {
Map<String, Object> afterKey = composite.afterKey();
// a null after-key means done
if (afterKey != null) {
AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
// update after-key with the new value
if (aggBuilder instanceof CompositeAggregationBuilder) {
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
comp.aggregateAfter(afterKey);
} else {
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
}
if (afterKey == null) {
return false;
}
AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
// update after-key with the new value
if (aggBuilder instanceof CompositeAggregationBuilder) {
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
comp.aggregateAfter(afterKey);
return true;
} else {
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
}
}
/**
* Deserializes the search source from a byte array.
*/
static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
private static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(source), registry)) {
return new SearchSourceBuilder(in);
}

View File

@ -28,8 +28,8 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
private final int size;
private int row = 0;
CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response, int limit, byte[] next,
boolean includeFrozen, String... indices) {
CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response,
int limit, byte[] next, boolean includeFrozen, String... indices) {
super(exts, mask);
CompositeAggregation composite = CompositeAggregationCursor.getComposite(response);
@ -40,18 +40,22 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
}
// page size
size = limit < 0 ? buckets.size() : Math.min(buckets.size(), limit);
size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit);
if (next == null) {
cursor = Cursor.EMPTY;
} else {
// compute remaining limit
int remainingLimit = limit - size;
// Compute remaining limit
// If the limit is -1 then we have a local sorting (sort on aggregate function) that requires all the buckets
// to be processed so we stop only when all data is exhausted.
int remainingLimit = (limit == -1) ? limit : ((limit - size) >= 0 ? (limit - size) : 0);
// if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached
// note that a composite agg might be valid but return zero groups (since these can be filtered with HAVING/bucket selector)
// however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response
// is returned
if (next == null || size == 0 || remainingLimit == 0) {
// is returned.
if (size == 0 || remainingLimit == 0) {
cursor = Cursor.EMPTY;
} else {
cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, includeFrozen, indices);
@ -92,4 +96,4 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
public Cursor nextPageCursor() {
return cursor;
}
}
}

View File

@ -117,7 +117,6 @@ public class Querier {
listener = sortingColumns.isEmpty() ? listener : new LocalAggregationSorterListener(listener, sortingColumns, query.limit());
ActionListener<SearchResponse> l = null;
if (query.isAggsOnly()) {
if (query.aggs().useImplicitGroupBy()) {
l = new ImplicitGroupActionListener(listener, client, cfg, output, query, search);
@ -134,7 +133,7 @@ public class Querier {
public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
String... indices) {
SearchRequest search = client.prepareSearch(indices)
return client.prepareSearch(indices)
// always track total hits accurately
.setTrackTotalHits(true)
.setAllowPartialSearchResults(false)
@ -143,7 +142,6 @@ public class Querier {
.setIndicesOptions(
includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
.request();
return search;
}
/**
@ -158,7 +156,7 @@ public class Querier {
private final ActionListener<SchemaRowSet> listener;
// keep the top N entries.
private final PriorityQueue<Tuple<List<?>, Integer>> data;
private final AggSortingQueue data;
private final AtomicInteger counter = new AtomicInteger();
private volatile Schema schema;
@ -174,53 +172,12 @@ public class Querier {
} else {
noLimit = false;
if (limit > MAXIMUM_SIZE) {
throw new PlanningException("The maximum LIMIT for aggregate sorting is [{}], received [{}]", limit, MAXIMUM_SIZE);
throw new PlanningException("The maximum LIMIT for aggregate sorting is [{}], received [{}]", MAXIMUM_SIZE, limit);
} else {
size = limit;
}
}
this.data = new PriorityQueue<Tuple<List<?>, Integer>>(size) {
// compare row based on the received attribute sort
// if a sort item is not in the list, it is assumed the sorting happened in ES
// and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria.
//
// Take for example ORDER BY a, x, b, y
// a, b - are sorted in ES
// x, y - need to be sorted client-side
// sorting on x kicks in, only if the values for a are equal.
// thanks to @jpountz for the row ordering idea as a way to preserve ordering
@SuppressWarnings("unchecked")
@Override
protected boolean lessThan(Tuple<List<?>, Integer> l, Tuple<List<?>, Integer> r) {
for (Tuple<Integer, Comparator> tuple : sortingColumns) {
int i = tuple.v1().intValue();
Comparator comparator = tuple.v2();
Object vl = l.v1().get(i);
Object vr = r.v1().get(i);
if (comparator != null) {
int result = comparator.compare(vl, vr);
// if things are equals, move to the next comparator
if (result != 0) {
return result < 0;
}
}
// no comparator means the existing order needs to be preserved
else {
// check the values - if they are equal move to the next comparator
// otherwise return the row order
if (Objects.equals(vl, vr) == false) {
return l.v2().compareTo(r.v2()) < 0;
}
}
}
// everything is equal, fall-back to the row order
return l.v2().compareTo(r.v2()) < 0;
}
};
this.data = new AggSortingQueue(size, sortingColumns);
}
@Override
@ -231,9 +188,8 @@ public class Querier {
private void doResponse(RowSet rowSet) {
// 1. consume all pages received
if (consumeRowSet(rowSet) == false) {
return;
}
consumeRowSet(rowSet);
Cursor cursor = rowSet.nextPageCursor();
// 1a. trigger a next call if there's still data
if (cursor != Cursor.EMPTY) {
@ -248,31 +204,21 @@ public class Querier {
sendResponse();
}
private boolean consumeRowSet(RowSet rowSet) {
// use a synchronized block for visibility purposes (there's no concurrency)
private void consumeRowSet(RowSet rowSet) {
ResultRowSet<?> rrs = (ResultRowSet<?>) rowSet;
synchronized (data) {
for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) {
List<Object> row = new ArrayList<>(rrs.columnCount());
rrs.forEachResultColumn(row::add);
// if the queue overflows and no limit was specified, bail out
if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) {
onFailure(new SqlIllegalArgumentException(
"The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT"));
return false;
}
for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) {
List<Object> row = new ArrayList<>(rrs.columnCount());
rrs.forEachResultColumn(row::add);
// if the queue overflows and no limit was specified, throw an error
if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) {
onFailure(new SqlIllegalArgumentException(
"The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT", MAXIMUM_SIZE));
}
}
return true;
}
private void sendResponse() {
List<List<?>> list = new ArrayList<>(data.size());
Tuple<List<?>, Integer> pop = null;
while ((pop = data.pop()) != null) {
list.add(pop.v1());
}
listener.onResponse(new PagingListRowSet(schema, list, schema.size(), cfg.pageSize()));
listener.onResponse(new PagingListRowSet(schema, data.asList(), schema.size(), cfg.pageSize()));
}
@Override
@ -373,7 +319,7 @@ public class Querier {
@Override
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
// there are some results
if (response.getAggregations().asList().size() > 0) {
if (response.getAggregations().asList().isEmpty() == false) {
// retry
if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) {
@ -383,7 +329,7 @@ public class Querier {
}
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
byte[] nextSearch = null;
byte[] nextSearch;
try {
nextSearch = CompositeAggregationCursor.serializeQuery(request.source());
} catch (Exception ex) {
@ -392,7 +338,8 @@ public class Querier {
}
listener.onResponse(
new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response, query.limit(),
new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response,
query.sortingColumns().isEmpty() ? query.limit() : -1,
nextSearch,
query.shouldIncludeFrozen(),
request.indices()));
@ -626,4 +573,63 @@ public class Querier {
listener.onFailure(ex);
}
}
@SuppressWarnings("rawtypes")
static class AggSortingQueue extends PriorityQueue<Tuple<List<?>, Integer>> {
private List<Tuple<Integer, Comparator>> sortingColumns;
AggSortingQueue(int maxSize, List<Tuple<Integer, Comparator>> sortingColumns) {
super(maxSize);
this.sortingColumns = sortingColumns;
}
// compare row based on the received attribute sort
// if a sort item is not in the list, it is assumed the sorting happened in ES
// and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria.
//
// Take for example ORDER BY a, x, b, y
// a, b - are sorted in ES
// x, y - need to be sorted client-side
// sorting on x kicks in, only if the values for a are equal.
// thanks to @jpountz for the row ordering idea as a way to preserve ordering
@SuppressWarnings("unchecked")
@Override
protected boolean lessThan(Tuple<List<?>, Integer> l, Tuple<List<?>, Integer> r) {
for (Tuple<Integer, Comparator> tuple : sortingColumns) {
int i = tuple.v1().intValue();
Comparator comparator = tuple.v2();
Object vl = l.v1().get(i);
Object vr = r.v1().get(i);
if (comparator != null) {
int result = comparator.compare(vl, vr);
// if things are equals, move to the next comparator
if (result != 0) {
return result > 0;
}
}
// no comparator means the existing order needs to be preserved
else {
// check the values - if they are equal move to the next comparator
// otherwise return the row order
if (Objects.equals(vl, vr) == false) {
return l.v2().compareTo(r.v2()) > 0;
}
}
}
// everything is equal, fall-back to the row order
return l.v2().compareTo(r.v2()) > 0;
}
List<List<?>> asList() {
List<List<?>> list = new ArrayList<>(super.size());
Tuple<List<?>, Integer> pop;
while ((pop = pop()) != null) {
list.add(0, pop.v1());
}
return list;
}
}
}

View File

@ -34,4 +34,4 @@ class SchemaCompositeAggsRowSet extends CompositeAggsRowSet implements SchemaRow
public Schema schema() {
return schema;
}
}
}

View File

@ -80,9 +80,13 @@ public abstract class SourceGenerator {
if (source.size() == -1) {
source.size(sz);
}
// limit the composite aggs only for non-local sorting
if (aggBuilder instanceof CompositeAggregationBuilder && container.sortingColumns().isEmpty()) {
((CompositeAggregationBuilder) aggBuilder).size(sz);
if (aggBuilder instanceof CompositeAggregationBuilder) {
// limit the composite aggs only for non-local sorting
if (container.sortingColumns().isEmpty()) {
((CompositeAggregationBuilder) aggBuilder).size(sz);
} else {
((CompositeAggregationBuilder) aggBuilder).size(size);
}
}
}

View File

@ -46,7 +46,8 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest
return new CompositeAggregationCursor(instance.next(), instance.extractors(),
randomValueOtherThan(instance.mask(), () -> randomBitSet(instance.extractors().size())),
randomValueOtherThan(instance.limit(), () -> randomIntBetween(1, 512)),
randomBoolean(), instance.indices());
!instance.includeFrozen(),
instance.indices());
}
@Override
@ -81,4 +82,4 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest
}
return mask;
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.execution.search.Querier.AggSortingQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class QuerierTests extends ESTestCase {
@SuppressWarnings("rawtypes")
public void testAggSortingAscending() {
Tuple<Integer, Comparator> tuple = new Tuple<>(0, Comparator.naturalOrder());
Querier.AggSortingQueue queue = new AggSortingQueue(10, Collections.singletonList(tuple));
for (int i = 50; i >= 0; i--) {
queue.insertWithOverflow(new Tuple<>(Collections.singletonList(i), i));
}
List<List<?>> results = queue.asList();
assertEquals(10, results.size());
for (int i = 0; i < 10; i ++) {
assertEquals(i, results.get(i).get(0));
}
}
@SuppressWarnings("rawtypes")
public void testAggSortingDescending() {
Tuple<Integer, Comparator> tuple = new Tuple<>(0, Comparator.reverseOrder());
Querier.AggSortingQueue queue = new AggSortingQueue(10, Collections.singletonList(tuple));
for (int i = 0; i <= 50; i++) {
queue.insertWithOverflow(new Tuple<>(Collections.singletonList(i), i));
}
List<List<?>> results = queue.asList();
assertEquals(10, results.size());
for (int i = 0; i < 10; i ++) {
assertEquals(50 - i, results.get(i).get(0));
}
}
@SuppressWarnings("rawtypes")
public void testAggSorting_TwoFields() {
List<Tuple<Integer, Comparator>> tuples = new ArrayList<>(2);
tuples.add(new Tuple<>(0, Comparator.reverseOrder()));
tuples.add(new Tuple<>(1, Comparator.naturalOrder()));
Querier.AggSortingQueue queue = new AggSortingQueue(10, tuples);
for (int i = 1; i <= 100; i++) {
queue.insertWithOverflow(new Tuple<>(Arrays.asList(i % 50 + 1, i), i));
}
List<List<?>> results = queue.asList();
assertEquals(10, results.size());
for (int i = 0; i < 10; i++) {
assertEquals(50 - (i / 2), results.get(i).get(0));
assertEquals(49 - (i / 2) + ((i % 2) * 50), results.get(i).get(1));
}
}
@SuppressWarnings("rawtypes")
public void testAggSorting_Randomized() {
// Initialize comparators for fields (columns)
int noColumns = randomIntBetween(3, 10);
List<Tuple<Integer, Comparator>> tuples = new ArrayList<>(noColumns);
boolean[] ordering = new boolean[noColumns];
for (int j = 0; j < noColumns; j++) {
boolean order = randomBoolean();
ordering[j] = order;
tuples.add(new Tuple<>(j, order ? Comparator.naturalOrder() : Comparator.reverseOrder()));
}
// Insert random no of documents (rows) with random 0/1 values for each field
int noDocs = randomIntBetween(10, 50);
int queueSize = randomIntBetween(4, noDocs / 2);
List<List<Integer>> expected = new ArrayList<>(noDocs);
Querier.AggSortingQueue queue = new AggSortingQueue(queueSize, tuples);
for (int i = 0; i < noDocs; i++) {
List<Integer> values = new ArrayList<>(noColumns);
for (int j = 0; j < noColumns; j++) {
values.add(randomBoolean() ? 1 : 0);
}
queue.insertWithOverflow(new Tuple<>(values, i));
expected.add(values);
}
List<List<?>> results = queue.asList();
assertEquals(queueSize, results.size());
expected.sort((o1, o2) -> {
for (int j = 0; j < noColumns; j++) {
if (o1.get(j) < o2.get(j)) {
return ordering[j] ? -1 : 1;
} else if (o1.get(j) > o2.get(j)) {
return ordering[j] ? 1 : -1;
}
}
return 0;
});
assertEquals(expected.subList(0, queueSize), results);
}
}