SQL: Refactor away the cycle between Rowset and Cursor (#45516)

Improve encapsulation of pagination of rowsets by breaking the cycle
between cursor and associated rowset implementation, all logic now
residing inside each cursor implementation.

(cherry picked from commit be8fe0a0ce562fe732fae12a0b236b5731e4638c)
This commit is contained in:
Costin Leau 2019-08-16 15:39:36 +03:00 committed by Costin Leau
parent ecb3ebd796
commit 96883dd028
42 changed files with 378 additions and 353 deletions

View File

@ -23,8 +23,7 @@ import org.elasticsearch.xpack.sql.planner.PlanningException;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.stats.Metrics;
import org.elasticsearch.xpack.sql.stats.QueryMetric;
@ -91,7 +90,7 @@ public class PlanExecutor {
}, listener::onFailure));
}
public void sql(Configuration cfg, String sql, List<SqlTypedParamValue> params, ActionListener<SchemaRowSet> listener) {
public void sql(Configuration cfg, String sql, List<SqlTypedParamValue> params, ActionListener<Page> listener) {
QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId());
metrics.total(metric);
@ -101,7 +100,7 @@ public class PlanExecutor {
}));
}
public void nextPage(Configuration cfg, Cursor cursor, ActionListener<RowSet> listener) {
public void nextPage(Configuration cfg, Cursor cursor, ActionListener<Page> listener) {
QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId());
metrics.total(metric);
metrics.paging(metric);

View File

@ -27,7 +27,8 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor;
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.type.Schema;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.io.IOException;
@ -36,6 +37,8 @@ import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Cursor for composite aggregation (GROUP BY).
@ -116,7 +119,7 @@ public class CompositeAggregationCursor implements Cursor {
}
@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<RowSet> listener) {
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
SearchSourceBuilder q;
try {
q = deserializeQuery(registry, nextQuery);
@ -135,21 +138,11 @@ public class CompositeAggregationCursor implements Cursor {
client.search(search, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse r) {
try {
// retry
if (shouldRetryDueToEmptyPage(r)) {
CompositeAggregationCursor.updateCompositeAfterKey(r, search.source());
client.search(search, this);
return;
}
boolean hasAfterKey = updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit,
hasAfterKey ? serializeQuery(query) : null, includeFrozen, indices);
listener.onResponse(rowSet);
} catch (Exception ex) {
listener.onFailure(ex);
}
handle(r, search.source(), ba -> new CompositeAggsRowSet(extractors, mask, r, limit, ba),
() -> client.search(search, this),
p -> listener.onResponse(p),
e -> listener.onFailure(e),
Schema.EMPTY, includeFrozen, indices);
}
@Override
@ -159,6 +152,39 @@ public class CompositeAggregationCursor implements Cursor {
});
}
static void handle(SearchResponse response, SearchSourceBuilder source, Function<byte[], CompositeAggsRowSet> makeRowSet,
Runnable retry, Consumer<Page> onPage, Consumer<Exception> onFailure,
Schema schema, boolean includeFrozen, String[] indices) {
// there are some results
if (response.getAggregations().asList().isEmpty() == false) {
// retry
if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) {
CompositeAggregationCursor.updateCompositeAfterKey(response, source);
retry.run();
return;
}
try {
boolean hasAfterKey = updateCompositeAfterKey(response, source);
byte[] queryAsBytes = hasAfterKey ? serializeQuery(source) : null;
CompositeAggsRowSet rowSet = makeRowSet.apply(queryAsBytes);
Cursor next = rowSet.remainingData() == 0
? Cursor.EMPTY
: new CompositeAggregationCursor(queryAsBytes, rowSet.extractors(), rowSet.mask(),
rowSet.remainingData(), includeFrozen, indices);
onPage.accept(new Page(rowSet, next));
} catch (Exception ex) {
onFailure.accept(ex);
}
}
// no results
else {
onPage.accept(Page.last(Rows.empty(schema)));
}
}
static boolean shouldRetryDueToEmptyPage(SearchResponse response) {
CompositeAggregation composite = getComposite(response);
// if there are no buckets but a next page, go fetch it instead of sending an empty response to the client

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import java.util.BitSet;
@ -22,14 +21,11 @@ import static java.util.Collections.emptyList;
class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
private final List<? extends CompositeAggregation.Bucket> buckets;
private final Cursor cursor;
private final int remainingData;
private final int size;
private int row = 0;
CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response,
int limit, byte[] next, boolean includeFrozen, String... indices) {
CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response, int limit, byte[] next) {
super(exts, mask);
CompositeAggregation composite = CompositeAggregationCursor.getComposite(response);
@ -43,7 +39,7 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit);
if (next == null) {
cursor = Cursor.EMPTY;
remainingData = 0;
} else {
// Compute remaining limit
@ -56,9 +52,9 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
// however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response
// is returned.
if (size == 0 || remainingLimit == 0) {
cursor = Cursor.EMPTY;
remainingData = 0;
} else {
cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, includeFrozen, indices);
remainingData = remainingLimit;
}
}
}
@ -92,8 +88,7 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
return size;
}
@Override
public Cursor nextPageCursor() {
return cursor;
int remainingData() {
return remainingData;
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.ListRowSet;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List;
class PagingListRowSet extends ListRowSet {
private final int pageSize;
private final int columnCount;
private final Cursor cursor;
PagingListRowSet(List<List<?>> list, int columnCount, int pageSize) {
this(Schema.EMPTY, list, columnCount, pageSize);
}
PagingListRowSet(Schema schema, List<List<?>> list, int columnCount, int pageSize) {
super(schema, list);
this.columnCount = columnCount;
this.pageSize = Math.min(pageSize, list.size());
this.cursor = list.size() > pageSize ? new PagingListCursor(list, columnCount, pageSize) : Cursor.EMPTY;
}
@Override
public int size() {
return pageSize;
}
@Override
public int columnCount() {
return columnCount;
}
@Override
public Cursor nextPageCursor() {
return cursor;
}
}

View File

@ -20,7 +20,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
@ -57,6 +56,8 @@ import org.elasticsearch.xpack.sql.querydsl.container.SearchHitFieldRef;
import org.elasticsearch.xpack.sql.querydsl.container.TopHitsAggRef;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.ListCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
@ -75,6 +76,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singletonList;
import static org.elasticsearch.action.ActionListener.wrap;
// TODO: add retry/back-off
public class Querier {
@ -98,7 +100,7 @@ public class Querier {
this.size = cfg.pageSize();
}
public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<SchemaRowSet> listener) {
public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<Page> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size);
// set query timeout
@ -152,22 +154,19 @@ public class Querier {
* results back to the client.
*/
@SuppressWarnings("rawtypes")
class LocalAggregationSorterListener implements ActionListener<SchemaRowSet> {
class LocalAggregationSorterListener implements ActionListener<Page> {
private final ActionListener<SchemaRowSet> listener;
private final ActionListener<Page> listener;
// keep the top N entries.
private final AggSortingQueue data;
private final AtomicInteger counter = new AtomicInteger();
private volatile Schema schema;
/**
* Match the default value for {@link MultiBucketConsumerService#MAX_BUCKET_SETTING}
*/
private static final int MAXIMUM_SIZE = 10_000;
private static final int MAXIMUM_SIZE = MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;
private final boolean noLimit;
LocalAggregationSorterListener(ActionListener<SchemaRowSet> listener, List<Tuple<Integer, Comparator>> sortingColumns, int limit) {
LocalAggregationSorterListener(ActionListener<Page> listener, List<Tuple<Integer, Comparator>> sortingColumns, int limit) {
this.listener = listener;
int size = MAXIMUM_SIZE;
@ -185,20 +184,26 @@ public class Querier {
}
@Override
public void onResponse(SchemaRowSet schemaRowSet) {
schema = schemaRowSet.schema();
doResponse(schemaRowSet);
public void onResponse(Page page) {
// schema is set on the first page (as the rest don't hold the schema anymore)
if (schema == null) {
RowSet rowSet = page.rowSet();
if (rowSet instanceof SchemaRowSet) {
schema = ((SchemaRowSet) rowSet).schema();
} else {
onFailure(new SqlIllegalArgumentException("No schema found inside {}", rowSet.getClass()));
return;
}
}
private void doResponse(RowSet rowSet) {
// 1. consume all pages received
consumeRowSet(rowSet);
consumeRowSet(page.rowSet());
Cursor cursor = rowSet.nextPageCursor();
Cursor cursor = page.next();
// 1a. trigger a next call if there's still data
if (cursor != Cursor.EMPTY) {
// trigger a next call
planExecutor.nextPage(cfg, cursor, ActionListener.wrap(this::doResponse, this::onFailure));
planExecutor.nextPage(cfg, cursor, this);
// make sure to bail out afterwards as we'll get called by a different thread
return;
}
@ -222,7 +227,7 @@ public class Querier {
}
private void sendResponse() {
listener.onResponse(new PagingListRowSet(schema, data.asList(), schema.size(), cfg.pageSize()));
listener.onResponse(ListCursor.of(schema, data.asList(), cfg.pageSize()));
}
@Override
@ -264,13 +269,13 @@ public class Querier {
}
});
ImplicitGroupActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg, List<Attribute> output,
ImplicitGroupActionListener(ActionListener<Page> listener, Client client, Configuration cfg, List<Attribute> output,
QueryContainer query, SearchRequest request) {
super(listener, client, cfg, output, query, request);
}
@Override
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
Aggregations aggs = response.getAggregations();
if (aggs != null) {
Aggregation agg = aggs.get(Aggs.ROOT_GROUP_NAME);
@ -297,10 +302,10 @@ public class Querier {
for (int i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) {
values[index++] = extractors.get(i).extract(implicitGroup);
}
listener.onResponse(Rows.singleton(schema, values));
listener.onResponse(Page.last(Rows.singleton(schema, values)));
} else if (buckets.isEmpty()) {
listener.onResponse(Rows.empty(schema));
listener.onResponse(Page.last(Rows.empty(schema)));
} else {
throw new SqlIllegalArgumentException("Too many groups returned by the implicit group; expected 1, received {}",
@ -315,43 +320,21 @@ public class Querier {
*/
static class CompositeActionListener extends BaseAggActionListener {
CompositeActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg,
CompositeActionListener(ActionListener<Page> listener, Client client, Configuration cfg,
List<Attribute> output, QueryContainer query, SearchRequest request) {
super(listener, client, cfg, output, query, request);
}
@Override
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
// there are some results
if (response.getAggregations().asList().isEmpty() == false) {
protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
// retry
if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) {
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
client.search(request, this);
return;
}
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
byte[] nextSearch;
try {
nextSearch = CompositeAggregationCursor.serializeQuery(request.source());
} catch (Exception ex) {
listener.onFailure(ex);
return;
}
listener.onResponse(
new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response,
query.sortingColumns().isEmpty() ? query.limit() : -1,
nextSearch,
query.shouldIncludeFrozen(),
request.indices()));
}
// no results
else {
listener.onResponse(Rows.empty(schema));
}
CompositeAggregationCursor.handle(response, request.source(),
ba -> new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response,
query.sortingColumns().isEmpty() ? query.limit() : -1, ba),
() -> client.search(request, this),
p -> listener.onResponse(p),
e -> listener.onFailure(e),
schema, query.shouldIncludeFrozen(), request.indices());
}
}
@ -360,7 +343,7 @@ public class Querier {
final SearchRequest request;
final BitSet mask;
BaseAggActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg, List<Attribute> output,
BaseAggActionListener(ActionListener<Page> listener, Client client, Configuration cfg, List<Attribute> output,
QueryContainer query, SearchRequest request) {
super(listener, client, cfg, output);
@ -425,7 +408,7 @@ public class Querier {
private final BitSet mask;
private final boolean multiValueFieldLeniency;
ScrollActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg,
ScrollActionListener(ActionListener<Page> listener, Client client, Configuration cfg,
List<Attribute> output, QueryContainer query) {
super(listener, client, cfg, output);
this.query = query;
@ -434,9 +417,7 @@ public class Querier {
}
@Override
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
SearchHit[] hits = response.getHits().getHits();
protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
// create response extractors for the first time
List<Tuple<FieldExtraction, ExpressionId>> refs = query.fields();
@ -445,30 +426,10 @@ public class Querier {
exts.add(createExtractor(ref.v1()));
}
// there are some results
if (hits.length > 0) {
String scrollId = response.getScrollId();
SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), scrollId);
// if there's an id, try to setup next scroll
if (scrollId != null &&
// is all the content already retrieved?
(Boolean.TRUE.equals(response.isTerminatedEarly())
|| response.getHits().getTotalHits().value == hits.length
|| hitRowSet.isLimitReached())) {
// if so, clear the scroll
clear(response.getScrollId(), ActionListener.wrap(
succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), null)),
listener::onFailure));
} else {
listener.onResponse(hitRowSet);
}
}
// no hits
else {
clear(response.getScrollId(), ActionListener.wrap(succeeded -> listener.onResponse(Rows.empty(schema)),
listener::onFailure));
}
ScrollCursor.handle(response, () -> new SchemaSearchHitRowSet(schema, exts, mask, query.limit(), response),
p -> listener.onResponse(p),
p -> clear(response.getScrollId(), wrap(success -> listener.onResponse(p), listener::onFailure)),
schema);
}
private HitExtractor createExtractor(FieldExtraction ref) {
@ -514,14 +475,14 @@ public class Querier {
*/
abstract static class BaseActionListener implements ActionListener<SearchResponse> {
final ActionListener<SchemaRowSet> listener;
final ActionListener<Page> listener;
final Client client;
final Configuration cfg;
final TimeValue keepAlive;
final Schema schema;
BaseActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg, List<Attribute> output) {
BaseActionListener(ActionListener<Page> listener, Client client, Configuration cfg, List<Attribute> output) {
this.listener = listener;
this.client = client;
@ -545,7 +506,7 @@ public class Querier {
}
}
protected abstract void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener);
protected abstract void handleResponse(SearchResponse response, ActionListener<Page> listener);
// clean-up the scroll in case of exception
protected final void cleanup(SearchResponse response, Exception ex) {

View File

@ -22,11 +22,8 @@ class SchemaCompositeAggsRowSet extends CompositeAggsRowSet implements SchemaRow
private final Schema schema;
SchemaCompositeAggsRowSet(Schema schema, List<BucketExtractor> exts, BitSet mask, SearchResponse response, int limitAggs,
byte[] next,
boolean includeFrozen,
String... indices) {
super(exts, mask, response, limitAggs, next, includeFrozen, indices);
SchemaCompositeAggsRowSet(Schema schema, List<BucketExtractor> exts, BitSet mask, SearchResponse r, int limitAggs, byte[] next) {
super(exts, mask, r, limitAggs, next);
this.schema = schema;
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.Schema;
@ -21,8 +21,8 @@ import java.util.List;
class SchemaSearchHitRowSet extends SearchHitRowSet implements SchemaRowSet {
private final Schema schema;
SchemaSearchHitRowSet(Schema schema, List<HitExtractor> exts, BitSet mask, SearchHit[] hits, int limitHits, String scrollId) {
super(exts, mask, hits, limitHits, scrollId);
SchemaSearchHitRowSet(Schema schema, List<HitExtractor> exts, BitSet mask, int limitHits, SearchResponse response) {
super(exts, mask, limitHits, response);
this.schema = schema;
}

View File

@ -14,18 +14,25 @@ import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.type.Schema;
import java.io.IOException;
import java.util.BitSet;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.elasticsearch.action.ActionListener.wrap;
public class ScrollCursor implements Cursor {
@ -83,30 +90,49 @@ public class ScrollCursor implements Cursor {
return limit;
}
@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<RowSet> listener) {
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
if (log.isTraceEnabled()) {
log.trace("About to execute scroll query {}", scrollId);
}
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(cfg.pageTimeout());
client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> {
SearchHitRowSet rowSet = new SearchHitRowSet(extractors, mask, response.getHits().getHits(),
limit, response.getScrollId());
if (rowSet.nextPageCursor() == Cursor.EMPTY ) {
// we are finished with this cursor, let's clean it before continuing
clear(cfg, client, ActionListener.wrap(success -> listener.onResponse(rowSet), listener::onFailure));
} else {
listener.onResponse(rowSet);
}
client.searchScroll(request, wrap(response -> {
handle(response, () -> new SearchHitRowSet(extractors, mask, limit, response),
p -> listener.onResponse(p),
p -> clear(cfg, client, wrap(success -> listener.onResponse(p), listener::onFailure)),
Schema.EMPTY);
}, listener::onFailure));
}
@Override
public void clear(Configuration cfg, Client client, ActionListener<Boolean> listener) {
cleanCursor(client, scrollId,
ActionListener.wrap(
cleanCursor(client, scrollId, wrap(
clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()),
listener::onFailure));
}
static void handle(SearchResponse response, Supplier<SearchHitRowSet> makeRowHit, Consumer<Page> onPage, Consumer<Page> clearScroll,
Schema schema) {
SearchHit[] hits = response.getHits().getHits();
// clean-up
if (hits.length > 0) {
SearchHitRowSet rowSet = makeRowHit.get();
Tuple<String, Integer> nextScrollData = rowSet.nextScrollData();
if (nextScrollData == null) {
// no more data, let's clean the scroll before continuing
clearScroll.accept(Page.last(rowSet));
} else {
Cursor next = new ScrollCursor(nextScrollData.v1(), rowSet.extractors(), rowSet.mask(), nextScrollData.v2());
onPage.accept(new Page(rowSet, next));
}
}
// no-hits
else {
clearScroll.accept(Page.last(Rows.empty(schema)));
}
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {

View File

@ -5,11 +5,13 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.util.ArrayList;
import java.util.Arrays;
@ -29,18 +31,19 @@ import java.util.Set;
class SearchHitRowSet extends ResultRowSet<HitExtractor> {
private final SearchHit[] hits;
private final Map<SearchHit, Map<String, SearchHit[]>> flatInnerHits = new HashMap<>();
private final Cursor cursor;
private final Set<String> innerHits = new LinkedHashSet<>();
private final String innerHit;
private final int size;
private final int[] indexPerLevel;
private final Tuple<String, Integer> nextScrollData;
private int row = 0;
SearchHitRowSet(List<HitExtractor> exts, BitSet mask, SearchHit[] hits, int limit, String scrollId) {
SearchHitRowSet(List<HitExtractor> exts, BitSet mask, int limit, SearchResponse response) {
super(exts, mask);
this.hits = hits;
this.hits = response.getHits().getHits();
// Since the results might contain nested docs, the iteration is similar to that of Aggregation
// namely it discovers the nested docs and then, for iteration, increments the deepest level first
@ -81,24 +84,30 @@ class SearchHitRowSet extends ResultRowSet<HitExtractor> {
indexPerLevel = new int[maxDepth + 1];
this.innerHit = innerHit;
String scrollId = response.getScrollId();
if (scrollId == null) {
/* SearchResponse can contain a null scroll when you start a
* scroll but all results fit in the first page. */
cursor = Cursor.EMPTY;
nextScrollData = null;
} else {
TotalHits totalHits = response.getHits().getTotalHits();
// compute remaining limit (only if the limit is specified - that is, positive).
int remainingLimit = limit < 0 ? limit : limit - size;
// if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached
if (size == 0 || remainingLimit == 0) {
cursor = Cursor.EMPTY;
if (size == 0 || remainingLimit == 0
// or the scroll has ended
|| totalHits != null && totalHits.value == hits.length) {
nextScrollData = null;
} else {
cursor = new ScrollCursor(scrollId, extractors(), mask, remainingLimit);
nextScrollData = new Tuple<>(scrollId, remainingLimit);
}
}
}
protected boolean isLimitReached() {
return cursor == Cursor.EMPTY;
return nextScrollData == null;
}
@Override
@ -131,8 +140,8 @@ class SearchHitRowSet extends ResultRowSet<HitExtractor> {
int endOfPath = entry.getKey().lastIndexOf('_');
if (endOfPath >= 0 && entry.getKey().substring(0, endOfPath).equals(path)) {
SearchHit[] h = entry.getValue().getHits();
for (int i = 0; i < h.length; i++) {
lhm.put(h[i].getNestedIdentity().getOffset(), h[i]);
for (SearchHit element : h) {
lhm.put(element.getNestedIdentity().getOffset(), element);
}
}
}
@ -210,8 +219,7 @@ class SearchHitRowSet extends ResultRowSet<HitExtractor> {
return size;
}
@Override
public Cursor nextPageCursor() {
return cursor;
Tuple<String, Integer> nextScrollData() {
return nextScrollData;
}
}

View File

@ -7,11 +7,11 @@ package org.elasticsearch.xpack.sql.plan.logical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
import java.util.List;
import java.util.Objects;
@ -52,7 +52,7 @@ public class LocalRelation extends LogicalPlan implements Executable {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
executable.execute(session, listener);
}

View File

@ -7,7 +7,11 @@ package org.elasticsearch.xpack.sql.plan.logical.command;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.ListCursor;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.EsField;
@ -51,4 +55,8 @@ public abstract class Command extends LogicalPlan implements Executable {
private FieldAttribute field(String name, EsField field) {
return new FieldAttribute(source(), name, field);
}
protected Page of(SqlSession session, List<List<?>> values) {
return ListCursor.of(Rows.schema(output()), values, session.configuration().pageSize());
}
}

View File

@ -12,13 +12,13 @@ import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.rule.RuleExecutor.Batch;
import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo;
import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.tree.Node;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.NodeUtils;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.type.KeywordEsField;
import org.elasticsearch.xpack.sql.util.Graphviz;
@ -75,7 +75,7 @@ public class Debug extends Command {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
switch (type) {
case ANALYZED:
session.debugAnalyzedPlan(plan, wrap(i -> handleInfo(i, listener), listener::onFailure));
@ -90,7 +90,7 @@ public class Debug extends Command {
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void handleInfo(ExecutionInfo info, ActionListener<SchemaRowSet> listener) {
private void handleInfo(ExecutionInfo info, ActionListener<Page> listener) {
String planString = null;
if (format == Format.TEXT) {
@ -135,7 +135,7 @@ public class Debug extends Command {
}
}
listener.onResponse(Rows.singleton(output(), planString));
listener.onResponse(Page.last(Rows.singleton(output(), planString)));
}
@Override

View File

@ -13,11 +13,11 @@ import org.elasticsearch.xpack.sql.plan.QueryPlan;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.type.KeywordEsField;
import org.elasticsearch.xpack.sql.util.Graphviz;
@ -85,10 +85,10 @@ public class Explain extends Command {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
if (type == Type.PARSED) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, plan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, plan))));
return;
}
@ -96,7 +96,7 @@ public class Explain extends Command {
session.analyzedPlan(plan, verify, wrap(analyzedPlan -> {
if (type == Type.ANALYZED) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, analyzedPlan))));
return;
}
@ -105,25 +105,25 @@ public class Explain extends Command {
if (verify) {
session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> {
if (type == Type.OPTIMIZED) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, optimizedPlan))));
return;
}
PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify);
if (type == Type.MAPPED) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan))));
return;
}
PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify);
if (type == Type.EXECUTABLE) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, executablePlan))));
return;
}
// Type.All
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan,
mappedPlan, executablePlan)));
listener.onResponse(Page.last(
Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan))));
}, listener::onFailure));
}
@ -133,14 +133,14 @@ public class Explain extends Command {
if (session.verifier().verifyFailures(analyzedPlan).isEmpty()) {
session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> {
if (type == Type.OPTIMIZED) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, optimizedPlan))));
return;
}
PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify);
if (type == Type.MAPPED) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan))));
return;
}
@ -148,30 +148,30 @@ public class Explain extends Command {
PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify);
if (type == Type.EXECUTABLE) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, executablePlan))));
return;
}
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan,
mappedPlan, executablePlan)));
listener.onResponse(Page.last(Rows.singleton(output(),
printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan))));
return;
}
// mapped failed
if (type != Type.ALL) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan))));
return;
}
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan,
mappedPlan, null)));
listener.onResponse(Page
.last(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, null))));
}, listener::onFailure));
// cannot continue
} else {
if (type != Type.ALL) {
listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan)));
listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, analyzedPlan))));
}
else {
listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, null, null, null)));
listener.onResponse(Page.last(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, null, null, null))));
}
}
}

View File

@ -9,8 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
@ -61,7 +60,7 @@ public class ShowColumns extends Command {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
String idx = index != null ? index : (pattern != null ? pattern.asIndexNameWildcard() : "*");
String regex = pattern != null ? pattern.asJavaRegex() : null;
@ -73,7 +72,7 @@ public class ShowColumns extends Command {
rows = new ArrayList<>();
fillInRows(indexResult.get().mapping(), null, rows);
}
listener.onResponse(Rows.of(output(), rows));
listener.onResponse(of(session, rows));
},
listener::onFailure));
}

View File

@ -11,11 +11,10 @@ import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.expression.function.FunctionDefinition;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.type.KeywordEsField;
import java.util.Collection;
@ -50,11 +49,11 @@ public class ShowFunctions extends Command {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
FunctionRegistry registry = session.functionRegistry();
Collection<FunctionDefinition> functions = registry.listFunctions(pattern != null ? pattern.asJavaRegex() : null);
listener.onResponse(Rows.of(output(), functions.stream()
listener.onResponse(of(session, functions.stream()
.map(f -> asList(f.name(), f.type().name()))
.collect(toList())));
}

View File

@ -8,11 +8,11 @@ package org.elasticsearch.xpack.sql.plan.logical.command;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.FieldAttribute;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.type.KeywordEsField;
import java.util.List;
@ -36,8 +36,8 @@ public class ShowSchemas extends Command {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
listener.onResponse(Rows.empty(output()));
public void execute(SqlSession session, ActionListener<Page> listener) {
listener.onResponse(Page.last(Rows.empty(output())));
}
@Override

View File

@ -9,8 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.analysis.index.IndexResolver.IndexType;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
@ -54,7 +53,7 @@ public class ShowTables extends Command {
}
@Override
public final void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public final void execute(SqlSession session, ActionListener<Page> listener) {
String idx = index != null ? index : (pattern != null ? pattern.asIndexNameWildcard() : "*");
String regex = pattern != null ? pattern.asJavaRegex() : null;
@ -63,7 +62,7 @@ public class ShowTables extends Command {
IndexType.VALID_INCLUDE_FROZEN : IndexType.VALID_REGULAR;
session.indexResolver().resolveNames(idx, regex, withFrozen, ActionListener.wrap(result -> {
listener.onResponse(Rows.of(output(), result.stream()
listener.onResponse(of(session, result.stream()
.map(t -> asList(t.name(), t.type().toSql(), t.type().toNative()))
.collect(toList())));
}, listener::onFailure));

View File

@ -13,8 +13,8 @@ import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
@ -97,14 +97,14 @@ public class SysColumns extends Command {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
Mode mode = session.configuration().mode();
List<Attribute> output = output(mode == Mode.ODBC);
String cluster = session.indexResolver().clusterName();
// bail-out early if the catalog is present but differs
if (Strings.hasText(catalog) && cluster.equals(catalog) == false) {
listener.onResponse(Rows.empty(output));
listener.onResponse(Page.last(Rows.empty(output)));
return;
}
@ -125,7 +125,7 @@ public class SysColumns extends Command {
fillInRows(cluster, esIndex.name(), esIndex.mapping(), null, rows, columnMatcher, mode);
}
listener.onResponse(Rows.of(output, rows));
listener.onResponse(of(session, rows));
}, listener::onFailure));
}
// otherwise use a merged mapping
@ -138,7 +138,7 @@ public class SysColumns extends Command {
fillInRows(cluster, indexName, esIndex.mapping(), null, rows, columnMatcher, mode);
}
listener.onResponse(Rows.of(output, rows));
listener.onResponse(of(session, rows));
}, listener::onFailure));
}
}

View File

@ -11,8 +11,8 @@ import org.elasticsearch.xpack.sql.analysis.index.IndexResolver.IndexType;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
@ -70,7 +70,7 @@ public class SysTables extends Command {
}
@Override
public final void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public final void execute(SqlSession session, ActionListener<Page> listener) {
String cluster = session.indexResolver().clusterName();
// first check if where dealing with ODBC enumeration
@ -85,7 +85,7 @@ public class SysTables extends Command {
Object[] enumeration = new Object[10];
// send only the cluster, everything else null
enumeration[0] = cluster;
listener.onResponse(Rows.singleton(output(), enumeration));
listener.onResponse(Page.last(Rows.singleton(output(), enumeration)));
return;
}
}
@ -111,7 +111,7 @@ public class SysTables extends Command {
}
values.sort(Comparator.comparing(l -> l.get(3).toString()));
listener.onResponse(Rows.of(output(), values));
listener.onResponse(of(session, values));
return;
}
}
@ -122,7 +122,7 @@ public class SysTables extends Command {
// if the catalog doesn't match, don't return any results
if (cRegex != null && !Pattern.matches(cRegex, cluster)) {
listener.onResponse(Rows.empty(output()));
listener.onResponse(Page.last(Rows.empty(output())));
return;
}
@ -141,7 +141,7 @@ public class SysTables extends Command {
}
session.indexResolver().resolveNames(idx, regex, tableTypes, ActionListener.wrap(result -> listener.onResponse(
Rows.of(output(), result.stream()
of(session, result.stream()
// sort by type (which might be legacy), then by name
.sorted(Comparator.<IndexInfo, String> comparing(i -> legacyName(i.type()))
.thenComparing(Comparator.comparing(i -> i.name())))

View File

@ -8,8 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical.command.sys;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
@ -72,7 +71,7 @@ public class SysTypes extends Command {
}
@Override
public final void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public final void execute(SqlSession session, ActionListener<Page> listener) {
Stream<DataType> values = Stream.of(DataType.values());
if (type.intValue() != 0) {
values = values.filter(t -> type.equals(t.sqlType.getVendorTypeNumber()));
@ -110,7 +109,7 @@ public class SysTypes extends Command {
))
.collect(toList());
listener.onResponse(Rows.of(output(), rows));
listener.onResponse(of(session, rows));
}
@Override

View File

@ -8,14 +8,16 @@ package org.elasticsearch.xpack.sql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.action.ActionListener.wrap;
public class CommandExec extends LeafExec {
private final Command command;
@ -35,8 +37,8 @@ public class CommandExec extends LeafExec {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
command.execute(session, listener);
public void execute(SqlSession session, ActionListener<Page> listener) {
command.execute(session, wrap(listener::onResponse, listener::onFailure));
}
@Override

View File

@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.execution.search.Querier;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
@ -53,7 +53,7 @@ public class EsQueryExec extends LeafExec {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
Querier scroller = new Querier(session);
scroller.query(output, queryContainer, index, listener);

View File

@ -7,12 +7,12 @@ package org.elasticsearch.xpack.sql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.EmptyExecutable;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Source;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.tree.Source;
import java.util.List;
import java.util.Objects;
@ -45,7 +45,7 @@ public class LocalExec extends LeafExec {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
public void execute(SqlSession session, ActionListener<Page> listener) {
executable.execute(session, listener);
}

View File

@ -7,15 +7,16 @@ package org.elasticsearch.xpack.sql.plan.physical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.planner.PlanningException;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
// this is mainly a marker interface to validate a plan before being executed
public interface Unexecutable extends Executable {
default void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
@Override
default void execute(SqlSession session, ActionListener<Page> listener) {
throw new PlanningException("Current plan {} is not executable", this);
}
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.action.BasicFormatter;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import java.io.IOException;
import java.util.Objects;
@ -59,7 +58,7 @@ public class TextFormatterCursor implements Cursor {
}
@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<RowSet> listener) {
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
delegate.nextPage(cfg, client, registry, listener);
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.action.SqlQueryAction;
import org.elasticsearch.xpack.sql.action.SqlQueryRequest;
import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
@ -25,6 +26,7 @@ import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
@ -76,15 +78,21 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
if (Strings.hasText(request.cursor()) == false) {
planExecutor.sql(cfg, request.query(), request.params(),
wrap(rowSet -> listener.onResponse(createResponse(request, rowSet)), listener::onFailure));
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
} else {
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
wrap(rowSet -> listener.onResponse(createResponse(request.mode(), request.columnar(), rowSet, null)),
wrap(p -> listener.onResponse(createResponse(request.mode(), request.columnar(), null, p)),
listener::onFailure));
}
}
static SqlQueryResponse createResponse(SqlQueryRequest request, SchemaRowSet rowSet) {
static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
RowSet rset = page.rowSet();
if ((rset instanceof SchemaRowSet) == false) {
throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass());
}
SchemaRowSet rowSet = (SchemaRowSet) rset;
List<ColumnInfo> columns = new ArrayList<>(rowSet.columnCount());
for (Schema.Entry entry : rowSet.schema()) {
if (Mode.isDriver(request.mode())) {
@ -94,22 +102,22 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
}
}
columns = unmodifiableList(columns);
return createResponse(request.mode(), request.columnar(), rowSet, columns);
return createResponse(request.mode(), request.columnar(), columns, page);
}
static SqlQueryResponse createResponse(Mode mode, boolean columnar, RowSet rowSet, List<ColumnInfo> columns) {
static SqlQueryResponse createResponse(Mode mode, boolean columnar, List<ColumnInfo> header, Page page) {
List<List<Object>> rows = new ArrayList<>();
rowSet.forEachRow(rowView -> {
page.rowSet().forEachRow(rowView -> {
List<Object> row = new ArrayList<>(rowView.columnCount());
rowView.forEachColumn(row::add);
rows.add(unmodifiableList(row));
});
return new SqlQueryResponse(
Cursors.encodeToString(Version.CURRENT, rowSet.nextPageCursor()),
Cursors.encodeToString(Version.CURRENT, page.next()),
mode,
columnar,
columns,
header,
rows);
}
}

View File

@ -14,12 +14,35 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
* Information required to access the next page of response.
*/
public interface Cursor extends NamedWriteable {
class Page {
private final RowSet rowSet;
private final Cursor next;
public Page(RowSet rowSet, Cursor next) {
this.rowSet = rowSet;
this.next = next;
}
public RowSet rowSet() {
return rowSet;
}
public Cursor next() {
return next;
}
public static Page last(RowSet rowSet) {
return new Page(rowSet, EMPTY);
}
}
Cursor EMPTY = EmptyCursor.INSTANCE;
/**
* Request the next page of data.
*/
void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<RowSet> listener);
void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener);
/**
* Cleans the resources associated with the cursor

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.execution.search.CompositeAggregationCursor;
import org.elasticsearch.xpack.sql.execution.search.PagingListCursor;
import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractors;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
@ -49,7 +48,7 @@ public final class Cursors {
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new));
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, CompositeAggregationCursor.NAME, CompositeAggregationCursor::new));
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, TextFormatterCursor.NAME, TextFormatterCursor::new));
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, PagingListCursor.NAME, PagingListCursor::new));
entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ListCursor.NAME, ListCursor::new));
// plus all their dependencies
entries.addAll(Processors.getNamedWriteables());

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import java.io.IOException;
@ -31,8 +32,8 @@ class EmptyCursor implements Cursor {
}
@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<RowSet> listener) {
throw new IllegalArgumentException("there is no next page");
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
throw new SqlIllegalArgumentException("there is no next page");
}
@Override

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import java.util.List;
import java.util.Objects;
@ -25,8 +26,8 @@ public class EmptyExecutable implements Executable {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
listener.onResponse(Rows.empty(output));
public void execute(SqlSession session, ActionListener<Page> listener) {
listener.onResponse(Page.last(Rows.empty(output)));
}
@Override

View File

@ -39,11 +39,6 @@ class EmptyRowSet extends AbstractRowSet implements SchemaRowSet {
return 0;
}
@Override
public Cursor nextPageCursor() {
return Cursor.EMPTY;
}
@Override
public Schema schema() {
return schema;

View File

@ -5,14 +5,15 @@
*/
package org.elasticsearch.xpack.sql.session;
import java.util.List;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import java.util.List;
public interface Executable {
List<Attribute> output();
void execute(SqlSession session, ActionListener<SchemaRowSet> listener);
void execute(SqlSession session, ActionListener<Page> listener);
}

View File

@ -4,16 +4,14 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.type.Schema;
import java.io.IOException;
import java.util.List;
@ -21,7 +19,7 @@ import java.util.Objects;
import static java.util.Collections.emptyList;
public class PagingListCursor implements Cursor {
public class ListCursor implements Cursor {
public static final String NAME = "p";
@ -29,14 +27,14 @@ public class PagingListCursor implements Cursor {
private final int columnCount;
private final int pageSize;
PagingListCursor(List<List<?>> data, int columnCount, int pageSize) {
public ListCursor(List<List<?>> data, int pageSize, int columnCount) {
this.data = data;
this.columnCount = columnCount;
this.pageSize = pageSize;
}
@SuppressWarnings("unchecked")
public PagingListCursor(StreamInput in) throws IOException {
public ListCursor(StreamInput in) throws IOException {
data = (List<List<?>>) in.readGenericValue();
columnCount = in.readVInt();
pageSize = in.readVInt();
@ -66,11 +64,27 @@ public class PagingListCursor implements Cursor {
return pageSize;
}
@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<RowSet> listener) {
// the check is really a safety measure since the page initialization handles it already (by returning an empty cursor)
public static Page of(Schema schema, List<List<?>> data, int pageSize) {
return of(schema, data, pageSize, schema.size());
}
// NB: private since the columnCount is for public cases inferred by the columnCount
// only on the next-page the schema becomes null however that's an internal detail hence
// why this method is not exposed
private static Page of(Schema schema, List<List<?>> data, int pageSize, int columnCount) {
List<List<?>> nextData = data.size() > pageSize ? data.subList(pageSize, data.size()) : emptyList();
listener.onResponse(new PagingListRowSet(nextData, columnCount, pageSize));
Cursor next = nextData.isEmpty()
? Cursor.EMPTY
: new ListCursor(nextData, pageSize, columnCount);
List<List<?>> currData = data.isEmpty() || pageSize == 0
? emptyList()
: data.size() == pageSize ? data : data.subList(0, Math.min(pageSize, data.size()));
return new Page(new ListRowSet(schema, currData, columnCount), next);
}
@Override
public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
listener.onResponse(of(Schema.EMPTY, data, pageSize, columnCount));
}
@Override
@ -93,7 +107,7 @@ public class PagingListCursor implements Cursor {
return false;
}
PagingListCursor other = (PagingListCursor) obj;
ListCursor other = (ListCursor) obj;
return Objects.equals(pageSize, other.pageSize)
&& Objects.equals(columnCount, other.columnCount)
&& Objects.equals(data, other.data);

View File

@ -13,13 +13,20 @@ public class ListRowSet extends AbstractRowSet implements SchemaRowSet {
private final Schema schema;
private final List<List<?>> list;
private final int columnCount;
private int pos = 0;
protected ListRowSet(Schema schema, List<List<?>> list) {
ListRowSet(Schema schema, List<List<?>> list) {
this(schema, list, schema.size());
}
ListRowSet(Schema schema, List<List<?>> list, int columnCount) {
this.schema = schema;
this.columnCount = columnCount;
this.list = list;
}
@Override
protected boolean doHasCurrent() {
return pos < size();
@ -49,13 +56,13 @@ public class ListRowSet extends AbstractRowSet implements SchemaRowSet {
return list.size();
}
@Override
public Cursor nextPageCursor() {
return Cursor.EMPTY;
}
@Override
public Schema schema() {
return schema;
}
@Override
public int columnCount() {
return columnCount;
}
}

View File

@ -22,11 +22,6 @@ public interface RowSet extends RowView {
void reset();
/**
* The key used by PlanExecutor#nextPage to fetch the next page.
*/
Cursor nextPageCursor();
default void forEachRow(Consumer<? super RowView> action) {
for (boolean hasRows = hasCurrentRow(); hasRows; hasRows = advanceRow()) {
action.accept(this);

View File

@ -7,18 +7,20 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.util.Check;
import java.util.Collections;
import java.util.List;
import static java.util.Collections.emptyList;
public class SingletonExecutable implements Executable {
private final List<Attribute> output;
private final Object[] values;
public SingletonExecutable() {
this(Collections.emptyList());
this(emptyList());
}
public SingletonExecutable(List<Attribute> output, Object... values) {
@ -33,8 +35,8 @@ public class SingletonExecutable implements Executable {
}
@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
listener.onResponse(Rows.singleton(output, values));
public void execute(SqlSession session, ActionListener<Page> listener) {
listener.onResponse(Page.last(Rows.singleton(output, values)));
}
@Override

View File

@ -43,11 +43,6 @@ class SingletonRowSet extends AbstractRowSet implements SchemaRowSet {
return 1;
}
@Override
public Cursor nextPageCursor() {
return Cursor.EMPTY;
}
@Override
public Schema schema() {
return schema;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
import org.elasticsearch.xpack.sql.rule.RuleExecutor;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import java.util.List;
import java.util.function.Function;
@ -159,7 +160,7 @@ public class SqlSession {
optimizedPlan(optimized, wrap(o -> listener.onResponse(planner.plan(o, verify)), listener::onFailure));
}
public void sql(String sql, List<SqlTypedParamValue> params, ActionListener<SchemaRowSet> listener) {
public void sql(String sql, List<SqlTypedParamValue> params, ActionListener<Page> listener) {
sqlExecutable(sql, params, wrap(e -> e.execute(this, listener), listener::onFailure));
}

View File

@ -514,7 +514,7 @@ public class SysColumnsTests extends ESTestCase {
return Void.TYPE;
}).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any());
tuple.v1().execute(tuple.v2(), wrap(consumer::accept, ex -> fail(ex.getMessage())));
tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage())));
}
private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, Map<String, EsField> mapping) {

View File

@ -386,6 +386,6 @@ public class SysTablesTests extends ESTestCase {
return Void.TYPE;
}).when(resolver).resolveNames(any(), any(), any(), any());
tuple.v1().execute(tuple.v2(), wrap(consumer::accept, ex -> fail(ex.getMessage())));
tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage())));
}
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.xpack.sql.analysis.index.IndexResolver;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.parser.SqlParser;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.TypesTests;
@ -50,7 +51,8 @@ public class SysTypesTests extends ESTestCase {
"INTERVAL_HOUR_TO_MINUTE", "INTERVAL_HOUR_TO_SECOND", "INTERVAL_MINUTE_TO_SECOND",
"GEO_SHAPE", "GEO_POINT", "UNSUPPORTED", "OBJECT", "NESTED");
cmd.execute(null, wrap(r -> {
cmd.execute(session(), wrap(p -> {
SchemaRowSet r = (SchemaRowSet) p.rowSet();
assertEquals(19, r.columnCount());
assertEquals(DataType.values().length, r.size());
assertFalse(r.schema().types().contains(DataType.NULL));
@ -72,7 +74,8 @@ public class SysTypesTests extends ESTestCase {
public void testSysTypesDefaultFiltering() {
Command cmd = sql("SYS TYPES 0").v1();
cmd.execute(null, wrap(r -> {
cmd.execute(session(), wrap(p -> {
SchemaRowSet r = (SchemaRowSet) p.rowSet();
assertEquals(DataType.values().length, r.size());
}, ex -> fail(ex.getMessage())));
}
@ -81,7 +84,8 @@ public class SysTypesTests extends ESTestCase {
// boolean = 16
Command cmd = sql("SYS TYPES " + JDBCType.BOOLEAN.getVendorTypeNumber()).v1();
cmd.execute(null, wrap(r -> {
cmd.execute(session(), wrap(p -> {
SchemaRowSet r = (SchemaRowSet) p.rowSet();
assertEquals(1, r.size());
assertEquals("BOOLEAN", r.column(0));
}, ex -> fail(ex.getMessage())));
@ -90,7 +94,8 @@ public class SysTypesTests extends ESTestCase {
public void testSysTypesNegativeFiltering() {
Command cmd = sql("SYS TYPES " + JDBCType.TINYINT.getVendorTypeNumber()).v1();
cmd.execute(null, wrap(r -> {
cmd.execute(session(), wrap(p -> {
SchemaRowSet r = (SchemaRowSet) p.rowSet();
assertEquals(1, r.size());
assertEquals("BYTE", r.column(0));
}, ex -> fail(ex.getMessage())));
@ -99,7 +104,8 @@ public class SysTypesTests extends ESTestCase {
public void testSysTypesMultipleMatches() {
Command cmd = sql("SYS TYPES " + JDBCType.VARCHAR.getVendorTypeNumber()).v1();
cmd.execute(null, wrap(r -> {
cmd.execute(session(), wrap(p -> {
SchemaRowSet r = (SchemaRowSet) p.rowSet();
assertEquals(3, r.size());
assertEquals("KEYWORD", r.column(0));
assertTrue(r.advanceRow());
@ -108,4 +114,8 @@ public class SysTypesTests extends ESTestCase {
assertEquals("IP", r.column(0));
}, ex -> fail(ex.getMessage())));
}
private static SqlSession session() {
return new SqlSession(TestUtils.TEST_CFG, null, null, null, null, null, null, null, null);
}
}

View File

@ -3,21 +3,22 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.ListCursor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class PagingListCursorTests extends AbstractWireSerializingTestCase<PagingListCursor> {
public static PagingListCursor randomPagingListCursor() {
public class ListCursorTests extends AbstractWireSerializingTestCase<ListCursor> {
public static ListCursor randomPagingListCursor() {
int size = between(1, 20);
int depth = between(1, 20);
@ -26,14 +27,14 @@ public class PagingListCursorTests extends AbstractWireSerializingTestCase<Pagin
values.add(Arrays.asList(randomArray(depth, s -> new Object[depth], () -> randomByte())));
}
return new PagingListCursor(values, depth, between(1, 20));
return new ListCursor(values, between(1, 20), depth);
}
@Override
protected PagingListCursor mutateInstance(PagingListCursor instance) throws IOException {
return new PagingListCursor(instance.data(),
instance.columnCount(),
randomValueOtherThan(instance.pageSize(), () -> between(1, 20)));
protected ListCursor mutateInstance(ListCursor instance) throws IOException {
return new ListCursor(instance.data(),
randomValueOtherThan(instance.pageSize(), () -> between(1, 20)),
instance.columnCount());
}
@Override
@ -42,22 +43,22 @@ public class PagingListCursorTests extends AbstractWireSerializingTestCase<Pagin
}
@Override
protected PagingListCursor createTestInstance() {
protected ListCursor createTestInstance() {
return randomPagingListCursor();
}
@Override
protected Reader<PagingListCursor> instanceReader() {
return PagingListCursor::new;
protected Reader<ListCursor> instanceReader() {
return ListCursor::new;
}
@Override
protected PagingListCursor copyInstance(PagingListCursor instance, Version version) throws IOException {
protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException {
/* Randomly choose between internal protocol round trip and String based
* round trips used to toXContent. */
if (randomBoolean()) {
return super.copyInstance(instance, version);
}
return (PagingListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
}
}