From 96883dd028059a4f5849ed2150cd482bad4ab1a8 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Fri, 16 Aug 2019 15:39:36 +0300 Subject: [PATCH] SQL: Refactor away the cycle between Rowset and Cursor (#45516) Improve encapsulation of pagination of rowsets by breaking the cycle between cursor and associated rowset implementation, all logic now residing inside each cursor implementation. (cherry picked from commit be8fe0a0ce562fe732fae12a0b236b5731e4638c) --- .../xpack/sql/execution/PlanExecutor.java | 7 +- .../search/CompositeAggregationCursor.java | 60 +++++--- .../execution/search/CompositeAggsRowSet.java | 21 ++- .../execution/search/PagingListRowSet.java | 46 ------ .../xpack/sql/execution/search/Querier.java | 133 +++++++----------- .../search/SchemaCompositeAggsRowSet.java | 7 +- .../search/SchemaSearchHitRowSet.java | 6 +- .../sql/execution/search/ScrollCursor.java | 54 +++++-- .../sql/execution/search/SearchHitRowSet.java | 38 +++-- .../xpack/sql/plan/logical/LocalRelation.java | 6 +- .../sql/plan/logical/command/Command.java | 8 ++ .../xpack/sql/plan/logical/command/Debug.java | 10 +- .../sql/plan/logical/command/Explain.java | 40 +++--- .../sql/plan/logical/command/ShowColumns.java | 7 +- .../plan/logical/command/ShowFunctions.java | 9 +- .../sql/plan/logical/command/ShowSchemas.java | 8 +- .../sql/plan/logical/command/ShowTables.java | 7 +- .../plan/logical/command/sys/SysColumns.java | 10 +- .../plan/logical/command/sys/SysTables.java | 12 +- .../plan/logical/command/sys/SysTypes.java | 7 +- .../xpack/sql/plan/physical/CommandExec.java | 10 +- .../xpack/sql/plan/physical/EsQueryExec.java | 4 +- .../xpack/sql/plan/physical/LocalExec.java | 6 +- .../xpack/sql/plan/physical/Unexecutable.java | 5 +- .../xpack/sql/plugin/TextFormatterCursor.java | 3 +- .../sql/plugin/TransportSqlQueryAction.java | 26 ++-- .../xpack/sql/session/Cursor.java | 25 +++- .../xpack/sql/session/Cursors.java | 3 +- .../xpack/sql/session/EmptyCursor.java | 5 +- .../xpack/sql/session/EmptyExecutable.java | 5 +- .../xpack/sql/session/EmptyRowSet.java | 5 - .../xpack/sql/session/Executable.java | 7 +- .../ListCursor.java} | 38 +++-- .../xpack/sql/session/ListRowSet.java | 19 ++- .../xpack/sql/session/RowSet.java | 5 - .../sql/session/SingletonExecutable.java | 10 +- .../xpack/sql/session/SingletonRowSet.java | 5 - .../xpack/sql/session/SqlSession.java | 3 +- .../logical/command/sys/SysColumnsTests.java | 2 +- .../logical/command/sys/SysTablesTests.java | 2 +- .../logical/command/sys/SysTypesTests.java | 20 ++- .../ListCursorTests.java} | 27 ++-- 42 files changed, 378 insertions(+), 353 deletions(-) delete mode 100644 x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java rename x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/{execution/search/PagingListCursor.java => session/ListCursor.java} (64%) rename x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/{execution/search/PagingListCursorTests.java => session/ListCursorTests.java} (63%) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java index 815c85b7fed..ca394bf11d8 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java @@ -23,8 +23,7 @@ import org.elasticsearch.xpack.sql.planner.PlanningException; import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.stats.Metrics; import org.elasticsearch.xpack.sql.stats.QueryMetric; @@ -91,7 +90,7 @@ public class PlanExecutor { }, listener::onFailure)); } - public void sql(Configuration cfg, String sql, List params, ActionListener listener) { + public void sql(Configuration cfg, String sql, List params, ActionListener listener) { QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId()); metrics.total(metric); @@ -101,7 +100,7 @@ public class PlanExecutor { })); } - public void nextPage(Configuration cfg, Cursor cursor, ActionListener listener) { + public void nextPage(Configuration cfg, Cursor cursor, ActionListener listener) { QueryMetric metric = QueryMetric.from(cfg.mode(), cfg.clientId()); metrics.total(metric); metrics.paging(metric); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index ee0bb216e6f..41b5e1199ef 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -27,7 +27,8 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor; import org.elasticsearch.xpack.sql.querydsl.agg.Aggs; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.util.StringUtils; import java.io.IOException; @@ -36,6 +37,8 @@ import java.util.BitSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; /** * Cursor for composite aggregation (GROUP BY). @@ -116,7 +119,7 @@ public class CompositeAggregationCursor implements Cursor { } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { SearchSourceBuilder q; try { q = deserializeQuery(registry, nextQuery); @@ -135,21 +138,11 @@ public class CompositeAggregationCursor implements Cursor { client.search(search, new ActionListener() { @Override public void onResponse(SearchResponse r) { - try { - // retry - if (shouldRetryDueToEmptyPage(r)) { - CompositeAggregationCursor.updateCompositeAfterKey(r, search.source()); - client.search(search, this); - return; - } - - boolean hasAfterKey = updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, - hasAfterKey ? serializeQuery(query) : null, includeFrozen, indices); - listener.onResponse(rowSet); - } catch (Exception ex) { - listener.onFailure(ex); - } + handle(r, search.source(), ba -> new CompositeAggsRowSet(extractors, mask, r, limit, ba), + () -> client.search(search, this), + p -> listener.onResponse(p), + e -> listener.onFailure(e), + Schema.EMPTY, includeFrozen, indices); } @Override @@ -159,6 +152,39 @@ public class CompositeAggregationCursor implements Cursor { }); } + static void handle(SearchResponse response, SearchSourceBuilder source, Function makeRowSet, + Runnable retry, Consumer onPage, Consumer onFailure, + Schema schema, boolean includeFrozen, String[] indices) { + + // there are some results + if (response.getAggregations().asList().isEmpty() == false) { + // retry + if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { + CompositeAggregationCursor.updateCompositeAfterKey(response, source); + retry.run(); + return; + } + + try { + boolean hasAfterKey = updateCompositeAfterKey(response, source); + byte[] queryAsBytes = hasAfterKey ? serializeQuery(source) : null; + CompositeAggsRowSet rowSet = makeRowSet.apply(queryAsBytes); + + Cursor next = rowSet.remainingData() == 0 + ? Cursor.EMPTY + : new CompositeAggregationCursor(queryAsBytes, rowSet.extractors(), rowSet.mask(), + rowSet.remainingData(), includeFrozen, indices); + onPage.accept(new Page(rowSet, next)); + } catch (Exception ex) { + onFailure.accept(ex); + } + } + // no results + else { + onPage.accept(Page.last(Rows.empty(schema))); + } + } + static boolean shouldRetryDueToEmptyPage(SearchResponse response) { CompositeAggregation composite = getComposite(response); // if there are no buckets but a next page, go fetch it instead of sending an empty response to the client diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java index 88b93359d0f..dd6b85279cb 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.execution.search; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor; -import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.RowSet; import java.util.BitSet; @@ -22,14 +21,11 @@ import static java.util.Collections.emptyList; class CompositeAggsRowSet extends ResultRowSet { private final List buckets; - - private final Cursor cursor; - + private final int remainingData; private final int size; private int row = 0; - CompositeAggsRowSet(List exts, BitSet mask, SearchResponse response, - int limit, byte[] next, boolean includeFrozen, String... indices) { + CompositeAggsRowSet(List exts, BitSet mask, SearchResponse response, int limit, byte[] next) { super(exts, mask); CompositeAggregation composite = CompositeAggregationCursor.getComposite(response); @@ -43,7 +39,7 @@ class CompositeAggsRowSet extends ResultRowSet { size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit); if (next == null) { - cursor = Cursor.EMPTY; + remainingData = 0; } else { // Compute remaining limit @@ -56,9 +52,9 @@ class CompositeAggsRowSet extends ResultRowSet { // however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response // is returned. if (size == 0 || remainingLimit == 0) { - cursor = Cursor.EMPTY; + remainingData = 0; } else { - cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, includeFrozen, indices); + remainingData = remainingLimit; } } } @@ -92,8 +88,7 @@ class CompositeAggsRowSet extends ResultRowSet { return size; } - @Override - public Cursor nextPageCursor() { - return cursor; + int remainingData() { + return remainingData; } -} +} \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java deleted file mode 100644 index 73da15255f0..00000000000 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListRowSet.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.sql.execution.search; - -import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.ListRowSet; -import org.elasticsearch.xpack.sql.type.Schema; - -import java.util.List; - -class PagingListRowSet extends ListRowSet { - - private final int pageSize; - private final int columnCount; - private final Cursor cursor; - - PagingListRowSet(List> list, int columnCount, int pageSize) { - this(Schema.EMPTY, list, columnCount, pageSize); - } - - PagingListRowSet(Schema schema, List> list, int columnCount, int pageSize) { - super(schema, list); - this.columnCount = columnCount; - this.pageSize = Math.min(pageSize, list.size()); - this.cursor = list.size() > pageSize ? new PagingListCursor(list, columnCount, pageSize) : Cursor.EMPTY; - } - - @Override - public int size() { - return pageSize; - } - - @Override - public int columnCount() { - return columnCount; - } - - @Override - public Cursor nextPageCursor() { - return cursor; - } -} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index f38d4a4f7dc..9e0d4f3a691 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; @@ -57,6 +56,8 @@ import org.elasticsearch.xpack.sql.querydsl.container.SearchHitFieldRef; import org.elasticsearch.xpack.sql.querydsl.container.TopHitsAggRef; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.Cursor.Page; +import org.elasticsearch.xpack.sql.session.ListCursor; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SchemaRowSet; @@ -75,6 +76,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonList; +import static org.elasticsearch.action.ActionListener.wrap; // TODO: add retry/back-off public class Querier { @@ -98,7 +100,7 @@ public class Querier { this.size = cfg.pageSize(); } - public void query(List output, QueryContainer query, String index, ActionListener listener) { + public void query(List output, QueryContainer query, String index, ActionListener listener) { // prepare the request SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size); // set query timeout @@ -152,22 +154,19 @@ public class Querier { * results back to the client. */ @SuppressWarnings("rawtypes") - class LocalAggregationSorterListener implements ActionListener { + class LocalAggregationSorterListener implements ActionListener { - private final ActionListener listener; + private final ActionListener listener; // keep the top N entries. private final AggSortingQueue data; private final AtomicInteger counter = new AtomicInteger(); private volatile Schema schema; - /** - * Match the default value for {@link MultiBucketConsumerService#MAX_BUCKET_SETTING} - */ - private static final int MAXIMUM_SIZE = 10_000; + private static final int MAXIMUM_SIZE = MultiBucketConsumerService.DEFAULT_MAX_BUCKETS; private final boolean noLimit; - LocalAggregationSorterListener(ActionListener listener, List> sortingColumns, int limit) { + LocalAggregationSorterListener(ActionListener listener, List> sortingColumns, int limit) { this.listener = listener; int size = MAXIMUM_SIZE; @@ -185,20 +184,26 @@ public class Querier { } @Override - public void onResponse(SchemaRowSet schemaRowSet) { - schema = schemaRowSet.schema(); - doResponse(schemaRowSet); - } + public void onResponse(Page page) { + // schema is set on the first page (as the rest don't hold the schema anymore) + if (schema == null) { + RowSet rowSet = page.rowSet(); + if (rowSet instanceof SchemaRowSet) { + schema = ((SchemaRowSet) rowSet).schema(); + } else { + onFailure(new SqlIllegalArgumentException("No schema found inside {}", rowSet.getClass())); + return; + } + } - private void doResponse(RowSet rowSet) { // 1. consume all pages received - consumeRowSet(rowSet); + consumeRowSet(page.rowSet()); - Cursor cursor = rowSet.nextPageCursor(); + Cursor cursor = page.next(); // 1a. trigger a next call if there's still data if (cursor != Cursor.EMPTY) { // trigger a next call - planExecutor.nextPage(cfg, cursor, ActionListener.wrap(this::doResponse, this::onFailure)); + planExecutor.nextPage(cfg, cursor, this); // make sure to bail out afterwards as we'll get called by a different thread return; } @@ -222,7 +227,7 @@ public class Querier { } private void sendResponse() { - listener.onResponse(new PagingListRowSet(schema, data.asList(), schema.size(), cfg.pageSize())); + listener.onResponse(ListCursor.of(schema, data.asList(), cfg.pageSize())); } @Override @@ -264,13 +269,13 @@ public class Querier { } }); - ImplicitGroupActionListener(ActionListener listener, Client client, Configuration cfg, List output, + ImplicitGroupActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query, SearchRequest request) { super(listener, client, cfg, output, query, request); } @Override - protected void handleResponse(SearchResponse response, ActionListener listener) { + protected void handleResponse(SearchResponse response, ActionListener listener) { Aggregations aggs = response.getAggregations(); if (aggs != null) { Aggregation agg = aggs.get(Aggs.ROOT_GROUP_NAME); @@ -297,10 +302,10 @@ public class Querier { for (int i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) { values[index++] = extractors.get(i).extract(implicitGroup); } - listener.onResponse(Rows.singleton(schema, values)); + listener.onResponse(Page.last(Rows.singleton(schema, values))); } else if (buckets.isEmpty()) { - listener.onResponse(Rows.empty(schema)); + listener.onResponse(Page.last(Rows.empty(schema))); } else { throw new SqlIllegalArgumentException("Too many groups returned by the implicit group; expected 1, received {}", @@ -315,43 +320,21 @@ public class Querier { */ static class CompositeActionListener extends BaseAggActionListener { - CompositeActionListener(ActionListener listener, Client client, Configuration cfg, + CompositeActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query, SearchRequest request) { super(listener, client, cfg, output, query, request); } @Override - protected void handleResponse(SearchResponse response, ActionListener listener) { - // there are some results - if (response.getAggregations().asList().isEmpty() == false) { - - // retry - if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { - CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); - client.search(request, this); - return; - } - - CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); - byte[] nextSearch; - try { - nextSearch = CompositeAggregationCursor.serializeQuery(request.source()); - } catch (Exception ex) { - listener.onFailure(ex); - return; - } - - listener.onResponse( - new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response, - query.sortingColumns().isEmpty() ? query.limit() : -1, - nextSearch, - query.shouldIncludeFrozen(), - request.indices())); - } - // no results - else { - listener.onResponse(Rows.empty(schema)); - } + protected void handleResponse(SearchResponse response, ActionListener listener) { + + CompositeAggregationCursor.handle(response, request.source(), + ba -> new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response, + query.sortingColumns().isEmpty() ? query.limit() : -1, ba), + () -> client.search(request, this), + p -> listener.onResponse(p), + e -> listener.onFailure(e), + schema, query.shouldIncludeFrozen(), request.indices()); } } @@ -360,7 +343,7 @@ public class Querier { final SearchRequest request; final BitSet mask; - BaseAggActionListener(ActionListener listener, Client client, Configuration cfg, List output, + BaseAggActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query, SearchRequest request) { super(listener, client, cfg, output); @@ -425,7 +408,7 @@ public class Querier { private final BitSet mask; private final boolean multiValueFieldLeniency; - ScrollActionListener(ActionListener listener, Client client, Configuration cfg, + ScrollActionListener(ActionListener listener, Client client, Configuration cfg, List output, QueryContainer query) { super(listener, client, cfg, output); this.query = query; @@ -434,9 +417,7 @@ public class Querier { } @Override - protected void handleResponse(SearchResponse response, ActionListener listener) { - SearchHit[] hits = response.getHits().getHits(); - + protected void handleResponse(SearchResponse response, ActionListener listener) { // create response extractors for the first time List> refs = query.fields(); @@ -445,30 +426,10 @@ public class Querier { exts.add(createExtractor(ref.v1())); } - // there are some results - if (hits.length > 0) { - String scrollId = response.getScrollId(); - SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), scrollId); - - // if there's an id, try to setup next scroll - if (scrollId != null && - // is all the content already retrieved? - (Boolean.TRUE.equals(response.isTerminatedEarly()) - || response.getHits().getTotalHits().value == hits.length - || hitRowSet.isLimitReached())) { - // if so, clear the scroll - clear(response.getScrollId(), ActionListener.wrap( - succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), null)), - listener::onFailure)); - } else { - listener.onResponse(hitRowSet); - } - } - // no hits - else { - clear(response.getScrollId(), ActionListener.wrap(succeeded -> listener.onResponse(Rows.empty(schema)), - listener::onFailure)); - } + ScrollCursor.handle(response, () -> new SchemaSearchHitRowSet(schema, exts, mask, query.limit(), response), + p -> listener.onResponse(p), + p -> clear(response.getScrollId(), wrap(success -> listener.onResponse(p), listener::onFailure)), + schema); } private HitExtractor createExtractor(FieldExtraction ref) { @@ -514,14 +475,14 @@ public class Querier { */ abstract static class BaseActionListener implements ActionListener { - final ActionListener listener; + final ActionListener listener; final Client client; final Configuration cfg; final TimeValue keepAlive; final Schema schema; - BaseActionListener(ActionListener listener, Client client, Configuration cfg, List output) { + BaseActionListener(ActionListener listener, Client client, Configuration cfg, List output) { this.listener = listener; this.client = client; @@ -545,7 +506,7 @@ public class Querier { } } - protected abstract void handleResponse(SearchResponse response, ActionListener listener); + protected abstract void handleResponse(SearchResponse response, ActionListener listener); // clean-up the scroll in case of exception protected final void cleanup(SearchResponse response, Exception ex) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java index 3ec4ff6b114..7eeb8b28f15 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java @@ -22,11 +22,8 @@ class SchemaCompositeAggsRowSet extends CompositeAggsRowSet implements SchemaRow private final Schema schema; - SchemaCompositeAggsRowSet(Schema schema, List exts, BitSet mask, SearchResponse response, int limitAggs, - byte[] next, - boolean includeFrozen, - String... indices) { - super(exts, mask, response, limitAggs, next, includeFrozen, indices); + SchemaCompositeAggsRowSet(Schema schema, List exts, BitSet mask, SearchResponse r, int limitAggs, byte[] next) { + super(exts, mask, r, limitAggs, next); this.schema = schema; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java index aa5c57aab60..7ba7a06fd8a 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaSearchHitRowSet.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.xpack.sql.execution.search; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; @@ -21,8 +21,8 @@ import java.util.List; class SchemaSearchHitRowSet extends SearchHitRowSet implements SchemaRowSet { private final Schema schema; - SchemaSearchHitRowSet(Schema schema, List exts, BitSet mask, SearchHit[] hits, int limitHits, String scrollId) { - super(exts, mask, hits, limitHits, scrollId); + SchemaSearchHitRowSet(Schema schema, List exts, BitSet mask, int limitHits, SearchResponse response) { + super(exts, mask, limitHits, response); this.schema = schema; } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index af57126cc56..55f78db4073 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -14,18 +14,25 @@ import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.type.Schema; import java.io.IOException; import java.util.BitSet; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.elasticsearch.action.ActionListener.wrap; public class ScrollCursor implements Cursor { @@ -83,29 +90,48 @@ public class ScrollCursor implements Cursor { return limit; } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { - log.trace("About to execute scroll query {}", scrollId); + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + if (log.isTraceEnabled()) { + log.trace("About to execute scroll query {}", scrollId); + } SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(cfg.pageTimeout()); - client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { - SearchHitRowSet rowSet = new SearchHitRowSet(extractors, mask, response.getHits().getHits(), - limit, response.getScrollId()); - if (rowSet.nextPageCursor() == Cursor.EMPTY ) { - // we are finished with this cursor, let's clean it before continuing - clear(cfg, client, ActionListener.wrap(success -> listener.onResponse(rowSet), listener::onFailure)); - } else { - listener.onResponse(rowSet); - } + client.searchScroll(request, wrap(response -> { + handle(response, () -> new SearchHitRowSet(extractors, mask, limit, response), + p -> listener.onResponse(p), + p -> clear(cfg, client, wrap(success -> listener.onResponse(p), listener::onFailure)), + Schema.EMPTY); }, listener::onFailure)); } @Override public void clear(Configuration cfg, Client client, ActionListener listener) { - cleanCursor(client, scrollId, - ActionListener.wrap( + cleanCursor(client, scrollId, wrap( clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()), listener::onFailure)); } + + static void handle(SearchResponse response, Supplier makeRowHit, Consumer onPage, Consumer clearScroll, + Schema schema) { + SearchHit[] hits = response.getHits().getHits(); + // clean-up + if (hits.length > 0) { + SearchHitRowSet rowSet = makeRowHit.get(); + Tuple nextScrollData = rowSet.nextScrollData(); + + if (nextScrollData == null) { + // no more data, let's clean the scroll before continuing + clearScroll.accept(Page.last(rowSet)); + } else { + Cursor next = new ScrollCursor(nextScrollData.v1(), rowSet.extractors(), rowSet.mask(), nextScrollData.v2()); + onPage.accept(new Page(rowSet, next)); + } + } + // no-hits + else { + clearScroll.accept(Page.last(Rows.empty(schema))); + } + } @Override public boolean equals(Object obj) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java index 83503541385..95316e108d1 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.sql.execution.search; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; -import org.elasticsearch.xpack.sql.session.Cursor; import java.util.ArrayList; import java.util.Arrays; @@ -29,18 +31,19 @@ import java.util.Set; class SearchHitRowSet extends ResultRowSet { private final SearchHit[] hits; private final Map> flatInnerHits = new HashMap<>(); - private final Cursor cursor; private final Set innerHits = new LinkedHashSet<>(); private final String innerHit; private final int size; private final int[] indexPerLevel; + private final Tuple nextScrollData; + private int row = 0; - SearchHitRowSet(List exts, BitSet mask, SearchHit[] hits, int limit, String scrollId) { + SearchHitRowSet(List exts, BitSet mask, int limit, SearchResponse response) { super(exts, mask); - this.hits = hits; + this.hits = response.getHits().getHits(); // Since the results might contain nested docs, the iteration is similar to that of Aggregation // namely it discovers the nested docs and then, for iteration, increments the deepest level first @@ -81,24 +84,30 @@ class SearchHitRowSet extends ResultRowSet { indexPerLevel = new int[maxDepth + 1]; this.innerHit = innerHit; + String scrollId = response.getScrollId(); + if (scrollId == null) { /* SearchResponse can contain a null scroll when you start a * scroll but all results fit in the first page. */ - cursor = Cursor.EMPTY; + nextScrollData = null; } else { + TotalHits totalHits = response.getHits().getTotalHits(); + // compute remaining limit (only if the limit is specified - that is, positive). int remainingLimit = limit < 0 ? limit : limit - size; // if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached - if (size == 0 || remainingLimit == 0) { - cursor = Cursor.EMPTY; + if (size == 0 || remainingLimit == 0 + // or the scroll has ended + || totalHits != null && totalHits.value == hits.length) { + nextScrollData = null; } else { - cursor = new ScrollCursor(scrollId, extractors(), mask, remainingLimit); + nextScrollData = new Tuple<>(scrollId, remainingLimit); } } } protected boolean isLimitReached() { - return cursor == Cursor.EMPTY; + return nextScrollData == null; } @Override @@ -131,8 +140,8 @@ class SearchHitRowSet extends ResultRowSet { int endOfPath = entry.getKey().lastIndexOf('_'); if (endOfPath >= 0 && entry.getKey().substring(0, endOfPath).equals(path)) { SearchHit[] h = entry.getValue().getHits(); - for (int i = 0; i < h.length; i++) { - lhm.put(h[i].getNestedIdentity().getOffset(), h[i]); + for (SearchHit element : h) { + lhm.put(element.getNestedIdentity().getOffset(), element); } } } @@ -146,7 +155,7 @@ class SearchHitRowSet extends ResultRowSet { } private class NestedHitOffsetComparator implements Comparator { - @Override + @Override public int compare(SearchHit sh1, SearchHit sh2) { if (sh1 == null && sh2 == null) { return 0; @@ -210,8 +219,7 @@ class SearchHitRowSet extends ResultRowSet { return size; } - @Override - public Cursor nextPageCursor() { - return cursor; + Tuple nextScrollData() { + return nextScrollData; } } \ No newline at end of file diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java index 53a485a3b05..9abe6fef3d4 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java @@ -7,11 +7,11 @@ package org.elasticsearch.xpack.sql.plan.logical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.List; import java.util.Objects; @@ -52,7 +52,7 @@ public class LocalRelation extends LogicalPlan implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java index 72ae456a33b..0b976634586 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java @@ -7,7 +7,11 @@ package org.elasticsearch.xpack.sql.plan.logical.command; import org.elasticsearch.xpack.sql.expression.FieldAttribute; import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Executable; +import org.elasticsearch.xpack.sql.session.ListCursor; +import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.DataType; import org.elasticsearch.xpack.sql.type.EsField; @@ -51,4 +55,8 @@ public abstract class Command extends LogicalPlan implements Executable { private FieldAttribute field(String name, EsField field) { return new FieldAttribute(source(), name, field); } + + protected Page of(SqlSession session, List> values) { + return ListCursor.of(Rows.schema(output()), values, session.configuration().pageSize()); + } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java index f4aa378bbce..eda730e8adb 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java @@ -12,13 +12,13 @@ import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.sql.rule.RuleExecutor.Batch; import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo; import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.Node; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.NodeUtils; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import org.elasticsearch.xpack.sql.util.Graphviz; @@ -75,7 +75,7 @@ public class Debug extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { switch (type) { case ANALYZED: session.debugAnalyzedPlan(plan, wrap(i -> handleInfo(i, listener), listener::onFailure)); @@ -90,7 +90,7 @@ public class Debug extends Command { } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void handleInfo(ExecutionInfo info, ActionListener listener) { + private void handleInfo(ExecutionInfo info, ActionListener listener) { String planString = null; if (format == Format.TEXT) { @@ -135,7 +135,7 @@ public class Debug extends Command { } } - listener.onResponse(Rows.singleton(output(), planString)); + listener.onResponse(Page.last(Rows.singleton(output(), planString))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java index c6904a87f3f..d3eac1cd6bb 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java @@ -13,11 +13,11 @@ import org.elasticsearch.xpack.sql.plan.QueryPlan; import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.sql.planner.Planner; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import org.elasticsearch.xpack.sql.util.Graphviz; @@ -85,10 +85,10 @@ public class Explain extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { if (type == Type.PARSED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, plan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, plan)))); return; } @@ -96,7 +96,7 @@ public class Explain extends Command { session.analyzedPlan(plan, verify, wrap(analyzedPlan -> { if (type == Type.ANALYZED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, analyzedPlan)))); return; } @@ -105,25 +105,25 @@ public class Explain extends Command { if (verify) { session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> { if (type == Type.OPTIMIZED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, optimizedPlan)))); return; } PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify); if (type == Type.MAPPED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan)))); return; } PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify); if (type == Type.EXECUTABLE) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, executablePlan)))); return; } // Type.All - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, - mappedPlan, executablePlan))); + listener.onResponse(Page.last( + Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan)))); }, listener::onFailure)); } @@ -133,14 +133,14 @@ public class Explain extends Command { if (session.verifier().verifyFailures(analyzedPlan).isEmpty()) { session.optimizedPlan(analyzedPlan, wrap(optimizedPlan -> { if (type == Type.OPTIMIZED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, optimizedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, optimizedPlan)))); return; } PhysicalPlan mappedPlan = planner.mapPlan(optimizedPlan, verify); if (type == Type.MAPPED) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan)))); return; } @@ -148,30 +148,30 @@ public class Explain extends Command { PhysicalPlan executablePlan = planner.foldPlan(mappedPlan, verify); if (type == Type.EXECUTABLE) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, executablePlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, executablePlan)))); return; } - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, - mappedPlan, executablePlan))); + listener.onResponse(Page.last(Rows.singleton(output(), + printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, executablePlan)))); return; } // mapped failed if (type != Type.ALL) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, mappedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, mappedPlan)))); return; } - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, - mappedPlan, null))); + listener.onResponse(Page + .last(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, optimizedPlan, mappedPlan, null)))); }, listener::onFailure)); // cannot continue } else { if (type != Type.ALL) { - listener.onResponse(Rows.singleton(output(), formatPlan(format, analyzedPlan))); + listener.onResponse(Page.last(Rows.singleton(output(), formatPlan(format, analyzedPlan)))); } else { - listener.onResponse(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, null, null, null))); + listener.onResponse(Page.last(Rows.singleton(output(), printPlans(format, plan, analyzedPlan, null, null, null)))); } } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java index 7cddc3fc0a7..33643fa0f9f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java @@ -9,8 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.FieldAttribute; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -61,7 +60,7 @@ public class ShowColumns extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { String idx = index != null ? index : (pattern != null ? pattern.asIndexNameWildcard() : "*"); String regex = pattern != null ? pattern.asJavaRegex() : null; @@ -73,7 +72,7 @@ public class ShowColumns extends Command { rows = new ArrayList<>(); fillInRows(indexResult.get().mapping(), null, rows); } - listener.onResponse(Rows.of(output(), rows)); + listener.onResponse(of(session, rows)); }, listener::onFailure)); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java index f0e5d3fced0..e30c252fe32 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java @@ -11,11 +11,10 @@ import org.elasticsearch.xpack.sql.expression.FieldAttribute; import org.elasticsearch.xpack.sql.expression.function.FunctionDefinition; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import java.util.Collection; @@ -50,11 +49,11 @@ public class ShowFunctions extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { FunctionRegistry registry = session.functionRegistry(); Collection functions = registry.listFunctions(pattern != null ? pattern.asJavaRegex() : null); - listener.onResponse(Rows.of(output(), functions.stream() + listener.onResponse(of(session, functions.stream() .map(f -> asList(f.name(), f.type().name())) .collect(toList()))); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java index 9d684b3ca45..6ebcfb2b16b 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java @@ -8,11 +8,11 @@ package org.elasticsearch.xpack.sql.plan.logical.command; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.FieldAttribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.type.KeywordEsField; import java.util.List; @@ -36,8 +36,8 @@ public class ShowSchemas extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { - listener.onResponse(Rows.empty(output())); + public void execute(SqlSession session, ActionListener listener) { + listener.onResponse(Page.last(Rows.empty(output()))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java index 8efc7d84377..4cdeae3ef50 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java @@ -9,8 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.analysis.index.IndexResolver.IndexType; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -54,7 +53,7 @@ public class ShowTables extends Command { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { String idx = index != null ? index : (pattern != null ? pattern.asIndexNameWildcard() : "*"); String regex = pattern != null ? pattern.asJavaRegex() : null; @@ -63,7 +62,7 @@ public class ShowTables extends Command { IndexType.VALID_INCLUDE_FROZEN : IndexType.VALID_REGULAR; session.indexResolver().resolveNames(idx, regex, withFrozen, ActionListener.wrap(result -> { - listener.onResponse(Rows.of(output(), result.stream() + listener.onResponse(of(session, result.stream() .map(t -> asList(t.name(), t.type().toSql(), t.type().toNative())) .collect(toList()))); }, listener::onFailure)); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java index 16d6eae924e..a1bb62b0021 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java @@ -13,8 +13,8 @@ import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; import org.elasticsearch.xpack.sql.plan.logical.command.Command; import org.elasticsearch.xpack.sql.proto.Mode; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -97,14 +97,14 @@ public class SysColumns extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { Mode mode = session.configuration().mode(); List output = output(mode == Mode.ODBC); String cluster = session.indexResolver().clusterName(); // bail-out early if the catalog is present but differs if (Strings.hasText(catalog) && cluster.equals(catalog) == false) { - listener.onResponse(Rows.empty(output)); + listener.onResponse(Page.last(Rows.empty(output))); return; } @@ -125,7 +125,7 @@ public class SysColumns extends Command { fillInRows(cluster, esIndex.name(), esIndex.mapping(), null, rows, columnMatcher, mode); } - listener.onResponse(Rows.of(output, rows)); + listener.onResponse(of(session, rows)); }, listener::onFailure)); } // otherwise use a merged mapping @@ -138,7 +138,7 @@ public class SysColumns extends Command { fillInRows(cluster, indexName, esIndex.mapping(), null, rows, columnMatcher, mode); } - listener.onResponse(Rows.of(output, rows)); + listener.onResponse(of(session, rows)); }, listener::onFailure)); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java index 111b392adb6..a3b8f181741 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTables.java @@ -11,8 +11,8 @@ import org.elasticsearch.xpack.sql.analysis.index.IndexResolver.IndexType; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.predicate.regex.LikePattern; import org.elasticsearch.xpack.sql.plan.logical.command.Command; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -70,7 +70,7 @@ public class SysTables extends Command { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { String cluster = session.indexResolver().clusterName(); // first check if where dealing with ODBC enumeration @@ -85,7 +85,7 @@ public class SysTables extends Command { Object[] enumeration = new Object[10]; // send only the cluster, everything else null enumeration[0] = cluster; - listener.onResponse(Rows.singleton(output(), enumeration)); + listener.onResponse(Page.last(Rows.singleton(output(), enumeration))); return; } } @@ -111,7 +111,7 @@ public class SysTables extends Command { } values.sort(Comparator.comparing(l -> l.get(3).toString())); - listener.onResponse(Rows.of(output(), values)); + listener.onResponse(of(session, values)); return; } } @@ -122,7 +122,7 @@ public class SysTables extends Command { // if the catalog doesn't match, don't return any results if (cRegex != null && !Pattern.matches(cRegex, cluster)) { - listener.onResponse(Rows.empty(output())); + listener.onResponse(Page.last(Rows.empty(output()))); return; } @@ -141,7 +141,7 @@ public class SysTables extends Command { } session.indexResolver().resolveNames(idx, regex, tableTypes, ActionListener.wrap(result -> listener.onResponse( - Rows.of(output(), result.stream() + of(session, result.stream() // sort by type (which might be legacy), then by name .sorted(Comparator. comparing(i -> legacyName(i.type())) .thenComparing(Comparator.comparing(i -> i.name()))) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java index 2112128b41b..95ba2346c3e 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypes.java @@ -8,8 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical.command.sys; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.plan.logical.command.Command; -import org.elasticsearch.xpack.sql.session.Rows; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -72,7 +71,7 @@ public class SysTypes extends Command { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { Stream values = Stream.of(DataType.values()); if (type.intValue() != 0) { values = values.filter(t -> type.equals(t.sqlType.getVendorTypeNumber())); @@ -110,7 +109,7 @@ public class SysTypes extends Command { )) .collect(toList()); - listener.onResponse(Rows.of(output(), rows)); + listener.onResponse(of(session, rows)); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java index f1fccb7e2c4..43a7bfac462 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java @@ -8,14 +8,16 @@ package org.elasticsearch.xpack.sql.plan.physical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.plan.logical.command.Command; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.List; import java.util.Objects; +import static org.elasticsearch.action.ActionListener.wrap; + public class CommandExec extends LeafExec { private final Command command; @@ -35,8 +37,8 @@ public class CommandExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { - command.execute(session, listener); + public void execute(SqlSession session, ActionListener listener) { + command.execute(session, wrap(listener::onResponse, listener::onFailure)); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java index 6fa87ca90bb..6e132fb6871 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.execution.search.Querier; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.NodeInfo; import org.elasticsearch.xpack.sql.tree.Source; @@ -53,7 +53,7 @@ public class EsQueryExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { Querier scroller = new Querier(session); scroller.query(output, queryContainer, index, listener); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java index cce19411465..c0adb1a9865 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java @@ -7,12 +7,12 @@ package org.elasticsearch.xpack.sql.plan.physical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.EmptyExecutable; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; -import org.elasticsearch.xpack.sql.tree.Source; import org.elasticsearch.xpack.sql.tree.NodeInfo; +import org.elasticsearch.xpack.sql.tree.Source; import java.util.List; import java.util.Objects; @@ -45,7 +45,7 @@ public class LocalExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java index e1b08b3e492..a32d2f88990 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java @@ -7,15 +7,16 @@ package org.elasticsearch.xpack.sql.plan.physical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.planner.PlanningException; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; // this is mainly a marker interface to validate a plan before being executed public interface Unexecutable extends Executable { - default void execute(SqlSession session, ActionListener listener) { + @Override + default void execute(SqlSession session, ActionListener listener) { throw new PlanningException("Current plan {} is not executable", this); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java index 4ab1d77fe21..3c19a497565 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.action.BasicFormatter; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; import java.io.IOException; import java.util.Objects; @@ -59,7 +58,7 @@ public class TextFormatterCursor implements Cursor { } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { delegate.nextPage(cfg, client, registry, listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java index 3e9c30f49b4..35393e2a229 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.action.SqlQueryAction; import org.elasticsearch.xpack.sql.action.SqlQueryRequest; import org.elasticsearch.xpack.sql.action.SqlQueryResponse; @@ -25,6 +26,7 @@ import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.proto.ColumnInfo; import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Cursors; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet; @@ -71,20 +73,26 @@ public class TransportSqlQueryAction extends HandledTransportAction listener.onResponse(createResponse(request, rowSet)), listener::onFailure)); + wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure)); } else { planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()), - wrap(rowSet -> listener.onResponse(createResponse(request.mode(), request.columnar(), rowSet, null)), + wrap(p -> listener.onResponse(createResponse(request.mode(), request.columnar(), null, p)), listener::onFailure)); } } - static SqlQueryResponse createResponse(SqlQueryRequest request, SchemaRowSet rowSet) { + static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { + RowSet rset = page.rowSet(); + if ((rset instanceof SchemaRowSet) == false) { + throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass()); + } + SchemaRowSet rowSet = (SchemaRowSet) rset; + List columns = new ArrayList<>(rowSet.columnCount()); for (Schema.Entry entry : rowSet.schema()) { if (Mode.isDriver(request.mode())) { @@ -94,22 +102,22 @@ public class TransportSqlQueryAction extends HandledTransportAction columns) { + static SqlQueryResponse createResponse(Mode mode, boolean columnar, List header, Page page) { List> rows = new ArrayList<>(); - rowSet.forEachRow(rowView -> { + page.rowSet().forEachRow(rowView -> { List row = new ArrayList<>(rowView.columnCount()); rowView.forEachColumn(row::add); rows.add(unmodifiableList(row)); }); return new SqlQueryResponse( - Cursors.encodeToString(Version.CURRENT, rowSet.nextPageCursor()), + Cursors.encodeToString(Version.CURRENT, page.next()), mode, columnar, - columns, + header, rows); } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java index ccb4a7cdc40..618b417b0d5 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java @@ -14,12 +14,35 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; * Information required to access the next page of response. */ public interface Cursor extends NamedWriteable { + + class Page { + private final RowSet rowSet; + private final Cursor next; + + public Page(RowSet rowSet, Cursor next) { + this.rowSet = rowSet; + this.next = next; + } + + public RowSet rowSet() { + return rowSet; + } + + public Cursor next() { + return next; + } + + public static Page last(RowSet rowSet) { + return new Page(rowSet, EMPTY); + } + } + Cursor EMPTY = EmptyCursor.INSTANCE; /** * Request the next page of data. */ - void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener); + void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener); /** * Cleans the resources associated with the cursor diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java index f268754e704..f1d18a59736 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.CompositeAggregationCursor; -import org.elasticsearch.xpack.sql.execution.search.PagingListCursor; import org.elasticsearch.xpack.sql.execution.search.ScrollCursor; import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractors; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; @@ -49,7 +48,7 @@ public final class Cursors { entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ScrollCursor.NAME, ScrollCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, CompositeAggregationCursor.NAME, CompositeAggregationCursor::new)); entries.add(new NamedWriteableRegistry.Entry(Cursor.class, TextFormatterCursor.NAME, TextFormatterCursor::new)); - entries.add(new NamedWriteableRegistry.Entry(Cursor.class, PagingListCursor.NAME, PagingListCursor::new)); + entries.add(new NamedWriteableRegistry.Entry(Cursor.class, ListCursor.NAME, ListCursor::new)); // plus all their dependencies entries.addAll(Processors.getNamedWriteables()); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java index fd9c63438a8..9f95b904940 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import java.io.IOException; @@ -31,8 +32,8 @@ class EmptyCursor implements Cursor { } @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { - throw new IllegalArgumentException("there is no next page"); + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + throw new SqlIllegalArgumentException("there is no next page"); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java index 09e0d3ac2a3..93a7e51c39b 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import java.util.List; import java.util.Objects; @@ -25,8 +26,8 @@ public class EmptyExecutable implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { - listener.onResponse(Rows.empty(output)); + public void execute(SqlSession session, ActionListener listener) { + listener.onResponse(Page.last(Rows.empty(output))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java index 9237850998a..9b6fa2aa3a0 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSet.java @@ -39,11 +39,6 @@ class EmptyRowSet extends AbstractRowSet implements SchemaRowSet { return 0; } - @Override - public Cursor nextPageCursor() { - return Cursor.EMPTY; - } - @Override public Schema schema() { return schema; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java index dbc16317029..d1d78194ebe 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java @@ -5,14 +5,15 @@ */ package org.elasticsearch.xpack.sql.session; -import java.util.List; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; + +import java.util.List; public interface Executable { List output(); - void execute(SqlSession session, ActionListener listener); + void execute(SqlSession session, ActionListener listener); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListCursor.java similarity index 64% rename from x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java rename to x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListCursor.java index 5a318eaa31f..7e20abc31de 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListCursor.java @@ -4,16 +4,14 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.sql.execution.search; +package org.elasticsearch.xpack.sql.session; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xpack.sql.session.Configuration; -import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.type.Schema; import java.io.IOException; import java.util.List; @@ -21,7 +19,7 @@ import java.util.Objects; import static java.util.Collections.emptyList; -public class PagingListCursor implements Cursor { +public class ListCursor implements Cursor { public static final String NAME = "p"; @@ -29,14 +27,14 @@ public class PagingListCursor implements Cursor { private final int columnCount; private final int pageSize; - PagingListCursor(List> data, int columnCount, int pageSize) { + public ListCursor(List> data, int pageSize, int columnCount) { this.data = data; this.columnCount = columnCount; this.pageSize = pageSize; } @SuppressWarnings("unchecked") - public PagingListCursor(StreamInput in) throws IOException { + public ListCursor(StreamInput in) throws IOException { data = (List>) in.readGenericValue(); columnCount = in.readVInt(); pageSize = in.readVInt(); @@ -66,11 +64,27 @@ public class PagingListCursor implements Cursor { return pageSize; } - @Override - public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { - // the check is really a safety measure since the page initialization handles it already (by returning an empty cursor) + public static Page of(Schema schema, List> data, int pageSize) { + return of(schema, data, pageSize, schema.size()); + } + + // NB: private since the columnCount is for public cases inferred by the columnCount + // only on the next-page the schema becomes null however that's an internal detail hence + // why this method is not exposed + private static Page of(Schema schema, List> data, int pageSize, int columnCount) { List> nextData = data.size() > pageSize ? data.subList(pageSize, data.size()) : emptyList(); - listener.onResponse(new PagingListRowSet(nextData, columnCount, pageSize)); + Cursor next = nextData.isEmpty() + ? Cursor.EMPTY + : new ListCursor(nextData, pageSize, columnCount); + List> currData = data.isEmpty() || pageSize == 0 + ? emptyList() + : data.size() == pageSize ? data : data.subList(0, Math.min(pageSize, data.size())); + return new Page(new ListRowSet(schema, currData, columnCount), next); + } + + @Override + public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener listener) { + listener.onResponse(of(Schema.EMPTY, data, pageSize, columnCount)); } @Override @@ -93,7 +107,7 @@ public class PagingListCursor implements Cursor { return false; } - PagingListCursor other = (PagingListCursor) obj; + ListCursor other = (ListCursor) obj; return Objects.equals(pageSize, other.pageSize) && Objects.equals(columnCount, other.columnCount) && Objects.equals(data, other.data); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java index 0122d333f7e..6bbbbaa462d 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSet.java @@ -13,13 +13,20 @@ public class ListRowSet extends AbstractRowSet implements SchemaRowSet { private final Schema schema; private final List> list; + private final int columnCount; private int pos = 0; - protected ListRowSet(Schema schema, List> list) { + ListRowSet(Schema schema, List> list) { + this(schema, list, schema.size()); + } + + ListRowSet(Schema schema, List> list, int columnCount) { this.schema = schema; + this.columnCount = columnCount; this.list = list; } + @Override protected boolean doHasCurrent() { return pos < size(); @@ -49,13 +56,13 @@ public class ListRowSet extends AbstractRowSet implements SchemaRowSet { return list.size(); } - @Override - public Cursor nextPageCursor() { - return Cursor.EMPTY; - } - @Override public Schema schema() { return schema; } + + @Override + public int columnCount() { + return columnCount; + } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java index 38a22ff73f1..93d6745dc65 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java @@ -22,11 +22,6 @@ public interface RowSet extends RowView { void reset(); - /** - * The key used by PlanExecutor#nextPage to fetch the next page. - */ - Cursor nextPageCursor(); - default void forEachRow(Consumer action) { for (boolean hasRows = hasCurrentRow(); hasRows; hasRows = advanceRow()) { action.accept(this); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java index 8a526fac6df..d86ac5fe008 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java @@ -7,18 +7,20 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.util.Check; -import java.util.Collections; import java.util.List; +import static java.util.Collections.emptyList; + public class SingletonExecutable implements Executable { private final List output; private final Object[] values; public SingletonExecutable() { - this(Collections.emptyList()); + this(emptyList()); } public SingletonExecutable(List output, Object... values) { @@ -33,8 +35,8 @@ public class SingletonExecutable implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { - listener.onResponse(Rows.singleton(output, values)); + public void execute(SqlSession session, ActionListener listener) { + listener.onResponse(Page.last(Rows.singleton(output, values))); } @Override diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java index c8a4e5eddfb..649caafc9be 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java @@ -43,11 +43,6 @@ class SingletonRowSet extends AbstractRowSet implements SchemaRowSet { return 1; } - @Override - public Cursor nextPageCursor() { - return Cursor.EMPTY; - } - @Override public Schema schema() { return schema; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java index 6a5b5bd2ae5..023b443850c 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.sql.planner.Planner; import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue; import org.elasticsearch.xpack.sql.rule.RuleExecutor; +import org.elasticsearch.xpack.sql.session.Cursor.Page; import java.util.List; import java.util.function.Function; @@ -159,7 +160,7 @@ public class SqlSession { optimizedPlan(optimized, wrap(o -> listener.onResponse(planner.plan(o, verify)), listener::onFailure)); } - public void sql(String sql, List params, ActionListener listener) { + public void sql(String sql, List params, ActionListener listener) { sqlExecutable(sql, params, wrap(e -> e.execute(this, listener), listener::onFailure)); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java index 9c8c32689b7..509e8e954f3 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java @@ -514,7 +514,7 @@ public class SysColumnsTests extends ESTestCase { return Void.TYPE; }).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any()); - tuple.v1().execute(tuple.v2(), wrap(consumer::accept, ex -> fail(ex.getMessage()))); + tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage()))); } private Tuple sql(String sql, List params, Map mapping) { diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java index d4db97aba09..834c5808b70 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTablesTests.java @@ -386,6 +386,6 @@ public class SysTablesTests extends ESTestCase { return Void.TYPE; }).when(resolver).resolveNames(any(), any(), any(), any()); - tuple.v1().execute(tuple.v2(), wrap(consumer::accept, ex -> fail(ex.getMessage()))); + tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage()))); } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java index 805268dd5b6..6b7500cab66 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysTypesTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.sql.analysis.index.IndexResolver; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.parser.SqlParser; import org.elasticsearch.xpack.sql.plan.logical.command.Command; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.type.DataType; import org.elasticsearch.xpack.sql.type.TypesTests; @@ -50,7 +51,8 @@ public class SysTypesTests extends ESTestCase { "INTERVAL_HOUR_TO_MINUTE", "INTERVAL_HOUR_TO_SECOND", "INTERVAL_MINUTE_TO_SECOND", "GEO_SHAPE", "GEO_POINT", "UNSUPPORTED", "OBJECT", "NESTED"); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(19, r.columnCount()); assertEquals(DataType.values().length, r.size()); assertFalse(r.schema().types().contains(DataType.NULL)); @@ -72,7 +74,8 @@ public class SysTypesTests extends ESTestCase { public void testSysTypesDefaultFiltering() { Command cmd = sql("SYS TYPES 0").v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(DataType.values().length, r.size()); }, ex -> fail(ex.getMessage()))); } @@ -81,7 +84,8 @@ public class SysTypesTests extends ESTestCase { // boolean = 16 Command cmd = sql("SYS TYPES " + JDBCType.BOOLEAN.getVendorTypeNumber()).v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(1, r.size()); assertEquals("BOOLEAN", r.column(0)); }, ex -> fail(ex.getMessage()))); @@ -90,7 +94,8 @@ public class SysTypesTests extends ESTestCase { public void testSysTypesNegativeFiltering() { Command cmd = sql("SYS TYPES " + JDBCType.TINYINT.getVendorTypeNumber()).v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(1, r.size()); assertEquals("BYTE", r.column(0)); }, ex -> fail(ex.getMessage()))); @@ -99,7 +104,8 @@ public class SysTypesTests extends ESTestCase { public void testSysTypesMultipleMatches() { Command cmd = sql("SYS TYPES " + JDBCType.VARCHAR.getVendorTypeNumber()).v1(); - cmd.execute(null, wrap(r -> { + cmd.execute(session(), wrap(p -> { + SchemaRowSet r = (SchemaRowSet) p.rowSet(); assertEquals(3, r.size()); assertEquals("KEYWORD", r.column(0)); assertTrue(r.advanceRow()); @@ -108,4 +114,8 @@ public class SysTypesTests extends ESTestCase { assertEquals("IP", r.column(0)); }, ex -> fail(ex.getMessage()))); } + + private static SqlSession session() { + return new SqlSession(TestUtils.TEST_CFG, null, null, null, null, null, null, null, null); + } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java similarity index 63% rename from x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java rename to x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java index c042b99ddf3..2eff1c5c5f3 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/PagingListCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java @@ -3,21 +3,22 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.sql.execution.search; +package org.elasticsearch.xpack.sql.session; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.sql.session.Cursors; +import org.elasticsearch.xpack.sql.session.ListCursor; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class PagingListCursorTests extends AbstractWireSerializingTestCase { - public static PagingListCursor randomPagingListCursor() { +public class ListCursorTests extends AbstractWireSerializingTestCase { + public static ListCursor randomPagingListCursor() { int size = between(1, 20); int depth = between(1, 20); @@ -26,14 +27,14 @@ public class PagingListCursorTests extends AbstractWireSerializingTestCase new Object[depth], () -> randomByte()))); } - return new PagingListCursor(values, depth, between(1, 20)); + return new ListCursor(values, between(1, 20), depth); } @Override - protected PagingListCursor mutateInstance(PagingListCursor instance) throws IOException { - return new PagingListCursor(instance.data(), - instance.columnCount(), - randomValueOtherThan(instance.pageSize(), () -> between(1, 20))); + protected ListCursor mutateInstance(ListCursor instance) throws IOException { + return new ListCursor(instance.data(), + randomValueOtherThan(instance.pageSize(), () -> between(1, 20)), + instance.columnCount()); } @Override @@ -42,22 +43,22 @@ public class PagingListCursorTests extends AbstractWireSerializingTestCase instanceReader() { - return PagingListCursor::new; + protected Reader instanceReader() { + return ListCursor::new; } @Override - protected PagingListCursor copyInstance(PagingListCursor instance, Version version) throws IOException { + protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException { /* Randomly choose between internal protocol round trip and String based * round trips used to toXContent. */ if (randomBoolean()) { return super.copyInstance(instance, version); } - return (PagingListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance)); + return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance)); } } \ No newline at end of file