diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java index 5962dca2152..3f12cc642f9 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java @@ -20,7 +20,7 @@ import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.sql.planner.Planner; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSettings; @@ -66,16 +66,16 @@ public class PlanExecutor { } } - public void sql(String sql, ActionListener listener) { + public void sql(String sql, ActionListener listener) { sql(SqlSettings.EMPTY, sql, listener); } - public void sql(SqlSettings sqlSettings, String sql, ActionListener listener) { + public void sql(SqlSettings sqlSettings, String sql, ActionListener listener) { SqlSession session = newSession(sqlSettings); session.executable(sql).execute(session, listener); } - public void nextPage(Cursor cursor, ActionListener listener) { + public void nextPage(Cursor cursor, ActionListener listener) { cursor.nextPage(client, listener); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSet.java similarity index 83% rename from sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSetCursor.java rename to sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSet.java index 32507d7686c..f4564977c4e 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSet.java @@ -5,21 +5,21 @@ */ package org.elasticsearch.xpack.sql.execution.search; -import org.elasticsearch.xpack.sql.session.AbstractRowSetCursor; +import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.type.Schema; import java.util.List; import java.util.function.Supplier; -class AggsRowSetCursor extends AbstractRowSetCursor { +class AggsRowSet extends AbstractRowSet { private int row = 0; private final AggValues agg; private final List> columns; - AggsRowSetCursor(Schema schema, AggValues agg, List> columns) { - super(schema, null); + AggsRowSet(Schema schema, AggValues agg, List> columns) { + super(schema); this.agg = agg; this.columns = columns; } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index cc4a5b1872b..25c6ea7b1b3 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -19,7 +19,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.type.DataType; import org.elasticsearch.xpack.sql.type.Schema; @@ -103,7 +103,7 @@ public class ScrollCursor implements Cursor { } @Override - public void nextPage(Client client, ActionListener listener) { + public void nextPage(Client client, ActionListener listener) { // Fake the schema for now. We'll try to remove the need later. List names = new ArrayList<>(extractors.size()); List dataTypes = new ArrayList<>(extractors.size()); @@ -123,7 +123,7 @@ public class ScrollCursor implements Cursor { client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { int limitHits = -1; // NOCOMMIT do a thing with this listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(), - limitHits, response.getScrollId(), null)); + limitHits, response.getScrollId())); }, listener::onFailure)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java index 4f2b7f3ba6e..1d59d5ce6d5 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; @@ -44,7 +43,7 @@ import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer; import org.elasticsearch.xpack.sql.querydsl.container.ScriptFieldRef; import org.elasticsearch.xpack.sql.querydsl.container.SearchHitFieldRef; import org.elasticsearch.xpack.sql.querydsl.container.TotalCountRef; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSettings; import org.elasticsearch.xpack.sql.type.Schema; @@ -53,7 +52,6 @@ import org.elasticsearch.xpack.sql.util.StringUtils; import java.util.ArrayList; import java.util.List; -import java.util.function.Consumer; import java.util.function.Supplier; // TODO: add retry/back-off public class Scroller { @@ -76,7 +74,7 @@ public class Scroller { this.size = size; } - public void scroll(Schema schema, QueryContainer query, String index, ActionListener listener) { + public void scroll(Schema schema, QueryContainer query, String index, ActionListener listener) { // prepare the request SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, size); @@ -88,29 +86,28 @@ public class Scroller { search.scroll(keepAlive).source().timeout(timeout); boolean isAggsOnly = query.isAggsOnly(); - - ScrollerActionListener l = isAggsOnly ? new AggsScrollActionListener(listener, client, timeout, schema, query) : new HandshakeScrollActionListener(listener, client, timeout, schema, query); + + ScrollerActionListener l; + if (isAggsOnly) { + l = new AggsScrollActionListener(listener, client, timeout, schema, query); + } else { + l = new HandshakeScrollActionListener(listener, client, timeout, schema, query); + } client.search(search, l); } - - static void from(ActionListener listener, SearchHitsActionListener previous, String scrollId, List ext) { - ScrollerActionListener l = new SessionScrollActionListener(listener, previous.client, previous.keepAlive, previous.schema, ext, previous.limit, previous.docsRead); - previous.client.searchScroll(new SearchScrollRequest(scrollId).scroll(previous.keepAlive), l); - } - // dedicated scroll used for aggs-only/group-by results static class AggsScrollActionListener extends ScrollerActionListener { private final QueryContainer query; - AggsScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) { + AggsScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) { super(listener, client, keepAlive, schema); this.query = query; } @Override - protected RowSetCursor handleResponse(SearchResponse response) { + protected RowSet handleResponse(SearchResponse response) { final List extractedAggs = new ArrayList<>(); AggValues aggValues = new AggValues(extractedAggs); @@ -151,7 +148,7 @@ public class Scroller { aggValues.init(maxDepth, query.limit()); clearScroll(response.getScrollId()); - return new AggsRowSetCursor(schema, aggValues, aggColumns); + return new AggsRowSet(schema, aggValues, aggColumns); } private Object[] extractAggValue(ColumnReference col, SearchResponse response) { @@ -209,12 +206,12 @@ public class Scroller { } // initial scroll used for parsing search hits (handles possible aggs) - static class HandshakeScrollActionListener extends SearchHitsActionListener { - + static class HandshakeScrollActionListener extends ScrollerActionListener { private final QueryContainer query; - HandshakeScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) { - super(listener, client, keepAlive, schema, query.limit(), 0); + HandshakeScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, + Schema schema, QueryContainer query) { + super(listener, client, keepAlive, schema); this.query = query; } @@ -222,9 +219,49 @@ public class Scroller { public void onResponse(SearchResponse response) { super.onResponse(response); } + + protected RowSet handleResponse(SearchResponse response) { + SearchHit[] hits = response.getHits().getHits(); + List exts = getExtractors(); - @Override - protected List getExtractors() { + // there are some results + if (hits.length > 0) { + String scrollId = response.getScrollId(); + + // if there's an id, try to setup next scroll + if (scrollId != null) { + // is all the content already retrieved? + if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length + // or maybe the limit has been reached + || (hits.length >= query.limit() && query.limit() > -1)) { + // if so, clear the scroll + clearScroll(scrollId); + // and remove it to indicate no more data is expected + scrollId = null; + } + } + int limitHits = query.limit() > 0 && hits.length >= query.limit() ? query.limit() : -1; + return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId); + } + // no hits + else { + clearScroll(response.getScrollId()); + // typically means last page but might be an aggs only query + return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts); + } + } + + private static boolean needsHit(List exts) { + for (HitExtractor ext : exts) { + // Anything non-constant requires extraction + if (!(ext instanceof ConstantExtractor)) { + return true; + } + } + return false; + } + + private List getExtractors() { // create response extractors for the first time List refs = query.columns(); @@ -262,92 +299,15 @@ public class Scroller { } } - // listener used for streaming the rest of the results after the handshake has been used - static class SessionScrollActionListener extends SearchHitsActionListener { - - private List exts; - - SessionScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, List ext, int limit, int docCount) { - super(listener, client, keepAlive, schema, limit, docCount); - this.exts = ext; - } - - @Override - protected List getExtractors() { - return exts; - } - } - - public abstract static class SearchHitsActionListener extends ScrollerActionListener { - - final int limit; - int docsRead; - - SearchHitsActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, int limit, int docsRead) { - super(listener, client, keepAlive, schema); - this.limit = limit; - this.docsRead = docsRead; - } - - protected RowSetCursor handleResponse(SearchResponse response) { - SearchHit[] hits = response.getHits().getHits(); - List exts = getExtractors(); - - // there are some results - if (hits.length > 0) { - String scrollId = response.getScrollId(); - Consumer> next = null; - - docsRead += hits.length; - - // if there's an id, try to setup next scroll - if (scrollId != null) { - // is all the content already retrieved? - if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length - // or maybe the limit has been reached - || (docsRead >= limit && limit > -1)) { - // if so, clear the scroll - clearScroll(scrollId); - // and remove it to indicate no more data is expected - scrollId = null; - } - else { - next = l -> Scroller.from(l, this, response.getScrollId(), exts); - } - } - int limitHits = limit > 0 && docsRead >= limit ? limit : -1; - return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId, next); - } - // no hits - else { - clearScroll(response.getScrollId()); - // typically means last page but might be an aggs only query - return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts); - } - } - - private static boolean needsHit(List exts) { - for (HitExtractor ext : exts) { - // Anything non-constant requires extraction - if (!(ext instanceof ConstantExtractor)) { - return true; - } - } - return false; - } - - protected abstract List getExtractors(); - } - abstract static class ScrollerActionListener implements ActionListener { - final ActionListener listener; + final ActionListener listener; final Client client; final TimeValue keepAlive; final Schema schema; - ScrollerActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema) { + ScrollerActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema) { this.listener = listener; this.client = client; @@ -369,7 +329,7 @@ public class Scroller { } } - protected abstract RowSetCursor handleResponse(SearchResponse response); + protected abstract RowSet handleResponse(SearchResponse response); protected final void clearScroll(String scrollId) { if (scrollId != null) { diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java index a5c1d3a2b76..0252918ed64 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java @@ -5,28 +5,23 @@ */ package org.elasticsearch.xpack.sql.execution.search; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; -import org.elasticsearch.xpack.sql.session.AbstractRowSetCursor; +import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSetCursor; import org.elasticsearch.xpack.sql.type.Schema; import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; -import java.util.function.Consumer; -// -// Since the results might contain nested docs, the iteration is similar to that of Aggregation -// namely it discovers the nested docs and then, for iteration, increments the deepest level first -// and eventually carries that over to the top level - -public class SearchHitRowSetCursor extends AbstractRowSetCursor { +/** + * Extracts rows from an array of {@link SearchHit}. + */ +public class SearchHitRowSetCursor extends AbstractRowSet { private final SearchHit[] hits; private final String scrollId; private final List extractors; @@ -38,15 +33,19 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor { private int row = 0; SearchHitRowSetCursor(Schema schema, List exts) { - this(schema, exts, SearchHits.EMPTY, -1, null, null); + this(schema, exts, SearchHits.EMPTY, -1, null); } - SearchHitRowSetCursor(Schema schema, List exts, SearchHit[] hits, int limitHits, String scrollId, Consumer> nextSet) { - super(schema, nextSet); + SearchHitRowSetCursor(Schema schema, List exts, SearchHit[] hits, int limitHits, String scrollId) { + super(schema); this.hits = hits; this.scrollId = scrollId; this.extractors = exts; + // Since the results might contain nested docs, the iteration is similar to that of Aggregation + // namely it discovers the nested docs and then, for iteration, increments the deepest level first + // and eventually carries that over to the top level + String innerHit = null; for (HitExtractor ex : exts) { innerHit = ex.innerHitName(); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java index a8bb2665d7e..dc143eed94c 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -41,7 +41,7 @@ public class LocalRelation extends LogicalPlan implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java index 210a8030c55..1cc9961487c 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Command.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical.command; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -26,9 +26,9 @@ public abstract class Command extends LogicalPlan implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { listener.onResponse(execute(session)); } - protected abstract RowSetCursor execute(SqlSession session); + protected abstract RowSet execute(SqlSession session); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java index 3216a6c93ae..2005b42b347 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java @@ -5,18 +5,13 @@ */ package org.elasticsearch.xpack.sql.plan.logical.command; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.sql.rule.RuleExecutor.Batch; import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo; import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -25,6 +20,10 @@ import org.elasticsearch.xpack.sql.tree.NodeUtils; import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.util.Graphviz; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import static java.util.Collections.singletonList; @@ -69,7 +68,7 @@ public class Debug extends Command { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { String planString = null; ExecutionInfo info = null; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java index 7735a63d495..db2fbb4cde8 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java @@ -11,7 +11,7 @@ import org.elasticsearch.xpack.sql.plan.QueryPlan; import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.sql.planner.Planner; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -71,7 +71,7 @@ public class Explain extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { String planString = null; String planName = "Parsed"; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionReset.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionReset.java index 5f5f2d06abf..9c6ab485e06 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionReset.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionReset.java @@ -5,21 +5,21 @@ */ package org.elasticsearch.xpack.sql.plan.logical.command; -import java.util.List; -import java.util.Objects; -import java.util.regex.Pattern; - import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.util.StringUtils; +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; + import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; @@ -48,7 +48,8 @@ public class SessionReset extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { + // NOCOMMIT this isn't likely to work any more. None of the session stuff is. session.updateSettings(s -> { Settings defaults = session.defaults().cfg(); Builder builder = Settings.builder().put(s); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionSet.java index 6ad9ad4f10e..90e9b263fd5 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/SessionSet.java @@ -5,18 +5,18 @@ */ package org.elasticsearch.xpack.sql.plan.logical.command; -import java.util.List; -import java.util.Objects; - import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; +import java.util.List; +import java.util.Objects; + import static java.util.Collections.singletonList; public class SessionSet extends Command { @@ -44,7 +44,7 @@ public class SessionSet extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { session.updateSettings(s -> { return Settings.builder().put(s).put(key, value).build(); }); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java index 3cd12fd3933..60c790c6617 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java @@ -9,7 +9,7 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -45,7 +45,7 @@ public class ShowColumns extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { List> rows = new ArrayList<>(); EsIndex fetched; try { diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java index 9e38721b18a..6df6470490d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java @@ -5,20 +5,20 @@ */ package org.elasticsearch.xpack.sql.plan.logical.command; -import java.util.Collection; -import java.util.List; -import java.util.Objects; - import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.expression.function.FunctionDefinition; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; @@ -42,7 +42,7 @@ public class ShowFunctions extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { FunctionRegistry registry = session.functionRegistry(); Collection functions = registry.listFunctions(pattern); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java index ddcc641554f..52a82c3713c 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java @@ -5,16 +5,16 @@ */ package org.elasticsearch.xpack.sql.plan.logical.command; -import java.util.List; - import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; +import java.util.List; + import static java.util.Collections.singletonList; public class ShowSchemas extends Command { @@ -29,7 +29,7 @@ public class ShowSchemas extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { return Rows.empty(output()); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSession.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSession.java index 33e9796c4fc..d00011d4204 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSession.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSession.java @@ -5,20 +5,20 @@ */ package org.elasticsearch.xpack.sql.plan.logical.command; -import java.util.List; -import java.util.Objects; -import java.util.regex.Pattern; - import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.util.StringUtils; +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; + import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; @@ -48,7 +48,7 @@ public class ShowSession extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { List> out; Settings s = session.settings().cfg(); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java index 2e8e3fdfffc..377f5478ff4 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java @@ -11,7 +11,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -45,7 +45,7 @@ public class ShowTables extends Command { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*"; session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> { listener.onResponse(Rows.of(output(), result.stream() @@ -55,7 +55,7 @@ public class ShowTables extends Command { } @Override - protected RowSetCursor execute(SqlSession session) { + protected RowSet execute(SqlSession session) { throw new UnsupportedOperationException("No synchronous exec"); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java index 3d454f01e79..9f2a1e4534d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java @@ -5,15 +5,15 @@ */ package org.elasticsearch.xpack.sql.plan.physical; -import java.util.List; -import java.util.Objects; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.plan.logical.command.Command; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SqlSession; +import java.util.List; +import java.util.Objects; + public class CommandExec extends LeafExec { private final Command command; @@ -28,7 +28,7 @@ public class CommandExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { command.execute(session, listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java index 85df5be5eca..d2544e52557 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.execution.search.Scroller; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -49,7 +49,7 @@ public class EsQueryExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { Scroller scroller = new Scroller(session.client(), session.settings()); scroller.scroll(Rows.schema(output), queryContainer, index, listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java index 4fa4fa0b834..839a2f4cfb5 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.session.EmptyExecutable; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -39,7 +39,7 @@ public class LocalExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java index 5546cc460fb..21a6c56d7cf 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java @@ -5,20 +5,20 @@ */ package org.elasticsearch.xpack.sql.plan.physical; -import java.util.Locale; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.planner.PlanningException; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SqlSession; +import java.util.Locale; + import static java.lang.String.format; // this is mainly a marker interface to validate a plan before being executed public interface Unexecutable extends Executable { - default void execute(SqlSession session, ActionListener listener) { + default void execute(SqlSession session, ActionListener listener) { throw new PlanningException(format(Locale.ROOT, "Current plan %s is not executable", this)); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java index 1d9b48127f5..f6140934a49 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java @@ -17,7 +17,7 @@ import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.session.RowSetCursor; +import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SqlSettings; import org.elasticsearch.xpack.sql.type.Schema; @@ -64,7 +64,7 @@ public class TransportSqlAction extends HandledTransportAction columns = null; if (includeColumnMetadata) { columns = new ArrayList<>(cursor.schema().types().size()); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSet.java similarity index 80% rename from sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSetCursor.java rename to sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSet.java index 4411118cef7..283bf7c9392 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSet.java @@ -5,25 +5,19 @@ */ package org.elasticsearch.xpack.sql.session; -import java.util.function.Consumer; - -import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.util.Assert; -public abstract class AbstractRowSetCursor implements RowSetCursor { +public abstract class AbstractRowSet implements RowSet { private final Schema schema; private final int size; - private final Consumer> nextSet; - private boolean terminated = false; - protected AbstractRowSetCursor(Schema schema, Consumer> nextSet) { + protected AbstractRowSet(Schema schema) { this.schema = schema; this.size = schema().names().size(); - this.nextSet = nextSet; } @Override @@ -65,21 +59,6 @@ public abstract class AbstractRowSetCursor implements RowSetCursor { protected abstract void doReset(); - @Override - public boolean hasNextSet() { - return nextSet != null; - } - - @Override - public void nextSet(ActionListener listener) { - if (nextSet != null) { - nextSet.accept(listener); - } - else { - listener.onResponse(null); - } - } - @Override public int rowSize() { return size; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java index e638c176f17..45024f27750 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Cursor.java @@ -27,7 +27,7 @@ public interface Cursor extends NamedWriteable { /** * Request the next page of data. */ - void nextPage(Client client, ActionListener listener); + void nextPage(Client client, ActionListener listener); /** * Write the {@linkplain Cursor} to a String for serialization over xcontent. */ diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java index 49ddf5898bd..56c7827ba6d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java @@ -35,7 +35,7 @@ class EmptyCursor implements Cursor { } @Override - public void nextPage(Client client, ActionListener listener) { + public void nextPage(Client client, ActionListener listener) { throw new IllegalArgumentException("there is no next page"); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java index 81f93417c51..fb9df0cf942 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java @@ -5,11 +5,11 @@ */ package org.elasticsearch.xpack.sql.session; +import org.elasticsearch.xpack.sql.expression.Attribute; + import java.util.List; import java.util.Objects; -import org.elasticsearch.xpack.sql.expression.Attribute; - public class EmptyExecutable implements Executable { private final List output; @@ -24,7 +24,7 @@ public class EmptyExecutable implements Executable { } @Override - public void execute(SqlSession session, org.elasticsearch.action.ActionListener listener) { + public void execute(SqlSession session, org.elasticsearch.action.ActionListener listener) { listener.onResponse(Rows.empty(output)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java index 133f4b34b9b..f32d73dd63d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java @@ -7,10 +7,10 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.xpack.sql.type.Schema; -class EmptyRowSetCursor extends AbstractRowSetCursor { +class EmptyRowSetCursor extends AbstractRowSet { EmptyRowSetCursor(Schema schema) { - super(schema, null); + super(schema); } @Override diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java index abb3ac5a5e8..d6c9652da2d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java @@ -14,5 +14,5 @@ public interface Executable { List output(); - void execute(SqlSession session, ActionListener listener); + void execute(SqlSession session, ActionListener listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java index 2dbc8ea271e..1c31e96b74b 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java @@ -9,13 +9,13 @@ import org.elasticsearch.xpack.sql.type.Schema; import java.util.List; -class ListRowSetCursor extends AbstractRowSetCursor { +class ListRowSetCursor extends AbstractRowSet { private final List> list; private int pos = 0; ListRowSetCursor(Schema schema, List> list) { - super(schema, null); + super(schema); this.list = list; } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java index 0ddf807eda1..bcf9a51439c 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java @@ -11,12 +11,8 @@ import org.elasticsearch.xpack.sql.execution.PlanExecutor; import java.util.function.Consumer; /** - * Interface representing a set of rows produced by the SQL engine. Builds on top of a RowView to _prevent_ - * a view object from being instantiated or returned. - * In other words to enforce immediate consumption (before moving forward). - * - * If (when) joins and such will be enabled, this interface would have to be retro-fitted - * to become even more lazy (so that things like number of entries) would not be known. + * A set of rows to be returned at one time and a way + * to get the next set of rows. */ public interface RowSet extends RowView { @@ -29,7 +25,7 @@ public interface RowSet extends RowView { void reset(); /** - * Return the key used by {@link PlanExecutor#nextPage(Cursor, ActionListener)} to fetch the next page. + * The key used by {@link PlanExecutor#nextPage(Cursor, ActionListener)} to fetch the next page. */ Cursor nextPageCursor(); @@ -38,4 +34,4 @@ public interface RowSet extends RowView { action.accept(this); } } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSetCursor.java deleted file mode 100644 index 12de4e85c08..00000000000 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSetCursor.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.session; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.xpack.sql.SqlException; - -import java.util.Objects; -import java.util.function.Consumer; - -public interface RowSetCursor extends RowSet { - - boolean hasNextSet(); - - void nextSet(ActionListener listener); - - default void forEachSet(Consumer action) { - Objects.requireNonNull(action); - - action.accept(this); - if (hasNextSet()) { - nextSet(new ActionListener() { - @Override - public void onResponse(RowSetCursor cursor) { - forEachSet(action); - } - - @Override - public void onFailure(Exception ex) { - throw ex instanceof RuntimeException ? (RuntimeException) ex : new SqlException(ex); - } - }); - } - } -} \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java index 3e9cd934e58..9131af20751 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java @@ -5,14 +5,13 @@ */ package org.elasticsearch.xpack.sql.session; +import org.elasticsearch.xpack.sql.type.Schema; + import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Objects; -import java.util.function.BiConsumer; import java.util.function.Consumer; -import org.elasticsearch.xpack.sql.type.Schema; - /** * A view into a row. * Offers access to the data but it shouldn't be held since it is not a data container. diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java index cd0d8b55f4c..98f936922de 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java @@ -50,7 +50,7 @@ public abstract class Rows { return new Schema(asList(n1, n2, n3, n4, n5), asList(t1, t2, t3, t4, t5)); } - public static RowSetCursor of(List attrs, List> values) { + public static RowSet of(List attrs, List> values) { if (values.isEmpty()) { return empty(attrs); } @@ -63,16 +63,16 @@ public abstract class Rows { return new ListRowSetCursor(schema, values); } - public static RowSetCursor singleton(List attrs, Object... values) { + public static RowSet singleton(List attrs, Object... values) { Assert.isTrue(attrs.size() == values.length, "Schema %s and values %s are out of sync", attrs, values); return new SingletonRowSet(schema(attrs), values); } - public static RowSetCursor empty(Schema schema) { + public static RowSet empty(Schema schema) { return new EmptyRowSetCursor(schema); } - public static RowSetCursor empty(List attrs) { + public static RowSet empty(List attrs) { return new EmptyRowSetCursor(schema(attrs)); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java index a81a7bb33db..c93150c6ee2 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java @@ -28,7 +28,7 @@ public class SingletonExecutable implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { listener.onResponse(Rows.singleton(output, values)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java index 9c00a9cf481..fd2a3986803 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java @@ -7,12 +7,12 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.xpack.sql.type.Schema; -class SingletonRowSet extends AbstractRowSetCursor { // NOCOMMIT is it worth keeping this when we have ListRowSet? +class SingletonRowSet extends AbstractRowSet { // NOCOMMIT is it worth keeping this when we have ListRowSet? private final Object[] values; SingletonRowSet(Schema schema, Object[] values) { - super(schema, null); + super(schema); this.values = values; } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java index a33c30b60b5..3563314b833 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java @@ -136,7 +136,7 @@ public class SqlSession { } } - public void sql(String sql, ActionListener listener) { + public void sql(String sql, ActionListener listener) { executable(sql).execute(this, listener); } @@ -153,7 +153,7 @@ public class SqlSession { return settings; } - public void execute(PhysicalPlan plan, ActionListener listener) { + public void execute(PhysicalPlan plan, ActionListener listener) { plan.execute(this, listener); } } \ No newline at end of file