From 6ce140cf0f6f501ed87d839072edf8170f2c729a Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Sat, 4 Nov 2017 22:52:13 +0000 Subject: [PATCH] SQL: Make schema optional in RowSet (elastic/x-pack-elasticsearch#2834) This prevents us from having to hack a fake schema together on the second (and third and fourth, etc) page of results. Original commit: elastic/x-pack-elasticsearch@7ba3119daa307e8e3ec79588b94bda9cbb4b5ab7 --- .../xpack/sql/execution/PlanExecutor.java | 5 +- ...rsor.java => AbstractSearchHitRowSet.java} | 20 ++-- .../sql/execution/search/AggsRowSet.java | 16 ++- .../search/InitialSearchHitRowSet.java | 32 +++++ .../sql/execution/search/ScrollCursor.java | 11 +- .../search/ScrolledSearchHitRowSet.java | 31 +++++ .../xpack/sql/execution/search/Scroller.java | 110 ++++++++---------- .../xpack/sql/plan/logical/LocalRelation.java | 4 +- .../xpack/sql/plan/logical/command/Debug.java | 9 +- .../sql/plan/logical/command/Explain.java | 15 +-- .../sql/plan/logical/command/ShowColumns.java | 6 +- .../plan/logical/command/ShowFunctions.java | 9 +- .../sql/plan/logical/command/ShowSchemas.java | 3 +- .../sql/plan/logical/command/ShowTables.java | 9 +- .../xpack/sql/plan/physical/CommandExec.java | 10 +- .../xpack/sql/plan/physical/EsQueryExec.java | 7 +- .../xpack/sql/plan/physical/LocalExec.java | 4 +- .../xpack/sql/plan/physical/Unexecutable.java | 4 +- .../plugin/sql/action/TransportSqlAction.java | 33 +++--- .../xpack/sql/session/AbstractRowSet.java | 28 +---- .../xpack/sql/session/EmptyExecutable.java | 3 +- .../xpack/sql/session/EmptyRowSetCursor.java | 10 +- .../xpack/sql/session/Executable.java | 2 +- .../xpack/sql/session/ListRowSetCursor.java | 10 +- .../xpack/sql/session/RowSet.java | 1 + .../xpack/sql/session/RowView.java | 18 ++- .../elasticsearch/xpack/sql/session/Rows.java | 12 +- .../xpack/sql/session/SchemaRowSet.java | 24 ++++ .../sql/session/SingletonExecutable.java | 4 +- .../xpack/sql/session/SingletonRowSet.java | 12 +- .../xpack/sql/session/SqlSession.java | 10 +- 31 files changed, 272 insertions(+), 200 deletions(-) rename sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/{SearchHitRowSetCursor.java => AbstractSearchHitRowSet.java} (92%) create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/InitialSearchHitRowSet.java create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrolledSearchHitRowSet.java create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SchemaRowSet.java diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java index 1a0b35eed9b..1a86ad67ea7 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.sql.planner.Planner; import org.elasticsearch.xpack.sql.planner.PlanningException; import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.Configuration; @@ -67,11 +68,11 @@ public class PlanExecutor { } } - public void sql(String sql, ActionListener listener) { + public void sql(String sql, ActionListener listener) { sql(Configuration.DEFAULT, sql, listener); } - public void sql(Configuration sqlSettings, String sql, ActionListener listener) { + public void sql(Configuration sqlSettings, String sql, ActionListener listener) { SqlSession session = newSession(sqlSettings); try { PhysicalPlan executable = session.executable(sql); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AbstractSearchHitRowSet.java similarity index 92% rename from sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java rename to sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AbstractSearchHitRowSet.java index 28f8308a44f..a538ecc4d05 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AbstractSearchHitRowSet.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.Cursor; -import org.elasticsearch.xpack.sql.type.Schema; import java.util.Arrays; import java.util.LinkedHashSet; @@ -21,7 +20,7 @@ import java.util.Set; /** * Extracts rows from an array of {@link SearchHit}. */ -public class SearchHitRowSetCursor extends AbstractRowSet { +abstract class AbstractSearchHitRowSet extends AbstractRowSet { private final SearchHit[] hits; private final String scrollId; private final List extractors; @@ -33,12 +32,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet { private final int[] indexPerLevel; private int row = 0; - SearchHitRowSetCursor(Schema schema, List exts) { - this(schema, exts, SearchHits.EMPTY, -1, null); - } - - SearchHitRowSetCursor(Schema schema, List exts, SearchHit[] hits, int limitHits, String scrollId) { - super(schema); + AbstractSearchHitRowSet(List exts, SearchHit[] hits, int limitHits, String scrollId) { this.hits = hits; this.scrollId = scrollId; this.extractors = exts; @@ -56,7 +50,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet { } int sz = hits.length; - + int maxDepth = 0; if (!innerHits.isEmpty()) { if (innerHits.size() > 1) { @@ -87,7 +81,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet { protected Object getColumn(int column) { HitExtractor e = extractors.get(column); int extractorLevel = e.innerHitName() == null ? 0 : 1; - + SearchHit hit = null; SearchHit[] sh = hits; for (int lvl = 0; lvl <= extractorLevel ; lvl++) { @@ -98,7 +92,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet { } hit = sh[indexPerLevel[lvl]]; } - + return e.get(hit); } @@ -135,7 +129,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet { } } } - + return true; } return false; @@ -171,4 +165,4 @@ public class SearchHitRowSetCursor extends AbstractRowSet { } return new ScrollCursor(scrollId, extractors, remainingLimit); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSet.java index f4564977c4e..d18040eaee2 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AggsRowSet.java @@ -7,19 +7,20 @@ package org.elasticsearch.xpack.sql.execution.search; import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.Cursor; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; import java.util.List; import java.util.function.Supplier; -class AggsRowSet extends AbstractRowSet { - - private int row = 0; +class AggsRowSet extends AbstractRowSet implements SchemaRowSet { + private final Schema schema; private final AggValues agg; private final List> columns; + private int row = 0; AggsRowSet(Schema schema, AggValues agg, List> columns) { - super(schema); + this.schema = schema; this.agg = agg; this.columns = columns; } @@ -53,4 +54,9 @@ class AggsRowSet extends AbstractRowSet { public Cursor nextPageCursor() { return Cursor.EMPTY; } -} \ No newline at end of file + + @Override + public Schema schema() { + return schema; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/InitialSearchHitRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/InitialSearchHitRowSet.java new file mode 100644 index 00000000000..403aa29e40e --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/InitialSearchHitRowSet.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; +import org.elasticsearch.xpack.sql.type.Schema; + +import java.util.List; + +/** + * Initial results from a scroll search. Distinct from the following pages + * because it has a {@link Schema} available. See {@link ScrolledSearchHitRowSet} + * for the next pages. + */ +public class InitialSearchHitRowSet extends AbstractSearchHitRowSet implements SchemaRowSet { + private final Schema schema; + + public InitialSearchHitRowSet(Schema schema, List exts, SearchHit[] hits, int limitHits, String scrollId) { + super(exts, hits, limitHits, scrollId); + this.schema = schema; + } + + @Override + public Schema schema() { + return schema; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index 2c240d5fea4..590b83e86bd 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -110,15 +110,6 @@ public class ScrollCursor implements Cursor { @Override public void nextPage(Client client, ActionListener listener) { - // Fake the schema for now. We'll try to remove the need later. - List names = new ArrayList<>(extractors.size()); - List dataTypes = new ArrayList<>(extractors.size()); - for (int i = 0; i < extractors.size(); i++) { - names.add("dummy"); - dataTypes.add(null); - } - // NOCOMMIT make schema properly nullable for the second page - Schema schema = new Schema(names, dataTypes); // NOCOMMIT add keep alive to the settings and pass it here /* Or something. The trouble is that settings is for *starting* * queries, but maybe we should actually have two sets of settings, @@ -128,7 +119,7 @@ public class ScrollCursor implements Cursor { SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeValueSeconds(90)); client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { int limitHits = limit; - listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(), + listener.onResponse(new ScrolledSearchHitRowSet(extractors, response.getHits().getHits(), limitHits, response.getScrollId())); }, listener::onFailure)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrolledSearchHitRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrolledSearchHitRowSet.java new file mode 100644 index 00000000000..cac3b537b63 --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrolledSearchHitRowSet.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; +import org.elasticsearch.xpack.sql.type.Schema; + +import java.util.List; + +/** + * "Next" page of results from a scroll search. Distinct from the first page + * because it no longer has the {@link Schema}. See {@link InitialSearchHitRowSet} + * for the initial results. + */ +public class ScrolledSearchHitRowSet extends AbstractSearchHitRowSet { + private final int columnCount; + + public ScrolledSearchHitRowSet(List exts, SearchHit[] hits, int limitHits, String scrollId) { + super(exts, hits, limitHits, scrollId); + this.columnCount = exts.size(); + } + + @Override + public int columnCount() { + return columnCount; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java index d2ddc9ce745..cf924487df0 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java @@ -47,6 +47,7 @@ import org.elasticsearch.xpack.sql.querydsl.container.TotalCountRef; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.util.StringUtils; @@ -73,7 +74,7 @@ public class Scroller { this.size = size; } - public void scroll(Schema schema, QueryContainer query, String index, ActionListener listener) { + public void scroll(Schema schema, QueryContainer query, String index, ActionListener listener) { // prepare the request SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, size); @@ -97,32 +98,32 @@ public class Scroller { // dedicated scroll used for aggs-only/group-by results static class AggsScrollActionListener extends ScrollerActionListener { - + private final QueryContainer query; - - AggsScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) { + + AggsScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) { super(listener, client, keepAlive, schema); this.query = query; } - + @Override - protected RowSet handleResponse(SearchResponse response) { - + protected SchemaRowSet handleResponse(SearchResponse response) { + final List extractedAggs = new ArrayList<>(); AggValues aggValues = new AggValues(extractedAggs); List> aggColumns = new ArrayList<>(query.columns().size()); - + // this method assumes the nested aggregation are all part of the same tree (the SQL group-by) int maxDepth = -1; - + List cols = query.columns(); for (int index = 0; index < cols.size(); index++) { ColumnReference col = cols.get(index); Supplier supplier = null; - + if (col instanceof ComputedRef) { ComputedRef pRef = (ComputedRef) col; - + Processor processor = pRef.processor().transformUp(a -> { Object[] value = extractAggValue(new AggRef(a.context()), response); extractedAggs.add(value); @@ -144,13 +145,13 @@ public class Scroller { final int aggPosition = extractedAggs.size() - 1; supplier = () -> aggValues.column(aggPosition); } - + aggColumns.add(supplier); if (col.depth() > maxDepth) { maxDepth = col.depth(); } } - + aggValues.init(maxDepth, query.limit()); clearScroll(response.getScrollId()); @@ -160,12 +161,12 @@ public class Scroller { private Object[] extractAggValue(ColumnReference col, SearchResponse response) { if (col == TotalCountRef.INSTANCE) { return new Object[] { Long.valueOf(response.getHits().getTotalHits()) }; - } + } else if (col instanceof AggRef) { Object[] arr; String path = ((AggRef) col).path(); - // yup, this is instance equality to make sure we only check the path used by the code + // yup, this is instance equality to make sure we only check the path used by the code if (path == TotalCountRef.PATH) { arr = new Object[] { Long.valueOf(response.getHits().getTotalHits()) }; } @@ -176,14 +177,14 @@ public class Scroller { path = AggPath.bucketValueWithoutFormat(path); } Object value = getAggProperty(response.getAggregations(), path); - + // // FIXME: this can be tabular in nature // if (ref instanceof MappedAggRef) { // Map map = (Map) value; // Object extractedValue = map.get(((MappedAggRef) // ref).fieldName()); // } - + if (formattedKey) { List buckets = ((MultiBucketsAggregation) value).getBuckets(); arr = new Object[buckets.size()]; @@ -194,12 +195,12 @@ public class Scroller { arr = value instanceof Object[] ? (Object[]) value : new Object[] { value }; } } - + return arr; } throw new SqlIllegalArgumentException("Unexpected non-agg/grouped column specified; %s", col.getClass()); } - + private static Object getAggProperty(Aggregations aggs, String path) { List list = AggregationPath.parse(path).getPathElementsAsStringList(); String aggName = list.get(0); @@ -210,30 +211,30 @@ public class Scroller { return agg.getProperty(list.subList(1, list.size())); } } - + // initial scroll used for parsing search hits (handles possible aggs) static class HandshakeScrollActionListener extends ScrollerActionListener { private final QueryContainer query; - - HandshakeScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, + + HandshakeScrollActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) { super(listener, client, keepAlive, schema); this.query = query; } - + @Override public void onResponse(SearchResponse response) { super.onResponse(response); } - protected RowSet handleResponse(SearchResponse response) { + protected SchemaRowSet handleResponse(SearchResponse response) { SearchHit[] hits = response.getHits().getHits(); List exts = getExtractors(); - + // there are some results if (hits.length > 0) { String scrollId = response.getScrollId(); - + // if there's an id, try to setup next scroll if (scrollId != null) { // is all the content already retrieved? @@ -246,80 +247,69 @@ public class Scroller { scrollId = null; } } - return new SearchHitRowSetCursor(schema, exts, hits, query.limit(), scrollId); + return new InitialSearchHitRowSet(schema, exts, hits, query.limit(), scrollId); } // no hits else { clearScroll(response.getScrollId()); - // typically means last page but might be an aggs only query - return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts); + return Rows.empty(schema); } } - - private static boolean needsHit(List exts) { - for (HitExtractor ext : exts) { - // Anything non-constant requires extraction - if (!(ext instanceof ConstantExtractor)) { - return true; - } - } - return false; - } - + private List getExtractors() { // create response extractors for the first time List refs = query.columns(); - + List exts = new ArrayList<>(refs.size()); - + for (ColumnReference ref : refs) { exts.add(createExtractor(ref)); } return exts; } - + private HitExtractor createExtractor(ColumnReference ref) { if (ref instanceof SearchHitFieldRef) { SearchHitFieldRef f = (SearchHitFieldRef) ref; return f.useDocValue() ? new DocValueExtractor(f.name()) : new SourceExtractor(f.name()); } - + if (ref instanceof NestedFieldRef) { NestedFieldRef f = (NestedFieldRef) ref; return new InnerHitExtractor(f.parent(), f.name(), f.useDocValue()); } - + if (ref instanceof ScriptFieldRef) { ScriptFieldRef f = (ScriptFieldRef) ref; return new DocValueExtractor(f.name()); } - + if (ref instanceof ComputedRef) { ProcessorDefinition proc = ((ComputedRef) ref).processor(); proc = proc.transformDown(l -> new HitExtractorInput(l.expression(), createExtractor(l.context())), ReferenceInput.class); return new ComputingHitExtractor(proc.asProcessor()); } - + throw new SqlIllegalArgumentException("Unexpected ValueReference %s", ref.getClass()); } } - + abstract static class ScrollerActionListener implements ActionListener { - - final ActionListener listener; - + + final ActionListener listener; + final Client client; final TimeValue keepAlive; final Schema schema; - - ScrollerActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema) { + + ScrollerActionListener(ActionListener listener, Client client, TimeValue keepAlive, Schema schema) { this.listener = listener; - + this.client = client; this.keepAlive = keepAlive; this.schema = schema; } - + // TODO: need to handle rejections plus check failures (shard size, etc...) @Override public void onResponse(final SearchResponse response) { @@ -333,19 +323,19 @@ public class Scroller { onFailure(ex); } } - - protected abstract RowSet handleResponse(SearchResponse response); - + + protected abstract SchemaRowSet handleResponse(SearchResponse response); + protected final void clearScroll(String scrollId) { if (scrollId != null) { // fire and forget client.prepareClearScroll().addScrollId(scrollId).execute(); } } - + @Override public final void onFailure(Exception ex) { listener.onFailure(ex); } } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java index dc143eed94c..d02726ed582 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/LocalRelation.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -41,7 +41,7 @@ public class LocalRelation extends LogicalPlan implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java index c5439e44260..acf6251217f 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Debug.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo; import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Node; @@ -69,7 +70,7 @@ public class Debug extends Command { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { String planString = null; ExecutionInfo info = null; @@ -86,7 +87,7 @@ public class Debug extends Command { default: break; } - + if (format == Format.TEXT) { StringBuilder sb = new StringBuilder(); if (info == null) { @@ -149,8 +150,8 @@ public class Debug extends Command { return false; } Debug o = (Debug) obj; - return Objects.equals(format, o.format) + return Objects.equals(format, o.format) && Objects.equals(type, o.type) && Objects.equals(plan, o.plan); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java index 7f84ee22d4d..3a75e6f852f 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/Explain.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.sql.planner.Planner; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; @@ -57,7 +58,7 @@ public class Explain extends Command { public boolean verify() { return verify; } - + public Format format() { return format; } @@ -72,7 +73,7 @@ public class Explain extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { String planString = null; String planName = "Parsed"; @@ -82,7 +83,7 @@ public class Explain extends Command { PhysicalPlan mappedPlan = null, executionPlan = null; Planner planner = session.planner(); - + // verification is on, exceptions can be thrown if (verify) { optimizedPlan = session.optimizedPlan(plan); @@ -102,7 +103,7 @@ public class Explain extends Command { } } } - + if (format == Format.TEXT) { StringBuilder sb = new StringBuilder(); sb.append("Parsed\n"); @@ -120,7 +121,7 @@ public class Explain extends Command { sb.append("\nExecutable\n"); sb.append("---------\n"); sb.append(executionPlan.toString()); - + planString = sb.toString(); } else { Map> plans = new HashMap<>(); @@ -183,8 +184,8 @@ public class Explain extends Command { } Explain o = (Explain) obj; return Objects.equals(verify, o.verify) - && Objects.equals(format, o.format) + && Objects.equals(format, o.format) && Objects.equals(type, o.type) && Objects.equals(plan, o.plan); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java index e83815916bd..504f0c4421b 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowColumns.java @@ -9,8 +9,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; -import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.CompoundDataType; @@ -45,7 +45,7 @@ public class ShowColumns extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { session.getIndices(new String[]{index}, IndicesOptions.strictExpandOpenAndForbidClosed(), ActionListener.wrap( esIndices -> { List> rows = new ArrayList<>(); @@ -91,4 +91,4 @@ public class ShowColumns extends Command { ShowColumns other = (ShowColumns) obj; return Objects.equals(index, other.index); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java index b17734853ec..ecf676fbc33 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowFunctions.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionDefinition; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; @@ -43,10 +44,10 @@ public class ShowFunctions extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { FunctionRegistry registry = session.functionRegistry(); Collection functions = registry.listFunctions(pattern); - + listener.onResponse(Rows.of(output(), functions.stream() .map(f -> asList(f.name(), f.type().name())) .collect(toList()))); @@ -62,11 +63,11 @@ public class ShowFunctions extends Command { if (this == obj) { return true; } - + if (obj == null || getClass() != obj.getClass()) { return false; } - + ShowFunctions other = (ShowFunctions) obj; return Objects.equals(pattern, other.pattern); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java index 9e971614865..919448607ba 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowSchemas.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; @@ -30,7 +31,7 @@ public class ShowSchemas extends Command { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { listener.onResponse(Rows.empty(output())); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java index 93b5eeb6ff8..5daa10a434e 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/ShowTables.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.type.DataTypes; @@ -45,7 +46,7 @@ public class ShowTables extends Command { } @Override - public final void execute(SqlSession session, ActionListener listener) { + public final void execute(SqlSession session, ActionListener listener) { String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*"; session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> { listener.onResponse(Rows.of(output(), result.stream() @@ -64,12 +65,12 @@ public class ShowTables extends Command { if (this == obj) { return true; } - + if (obj == null || getClass() != obj.getClass()) { return false; } - + ShowTables other = (ShowTables) obj; return Objects.equals(pattern, other.pattern); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java index 9f2a1e4534d..11bab8d1017 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/CommandExec.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.physical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.plan.logical.command.Command; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import java.util.List; @@ -28,7 +28,7 @@ public class CommandExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { command.execute(session, listener); } @@ -47,12 +47,12 @@ public class CommandExec extends LeafExec { if (this == obj) { return true; } - + if (obj == null || getClass() != obj.getClass()) { return false; } - + CommandExec other = (CommandExec) obj; return Objects.equals(command, other.command); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java index d2544e52557..f68bfbf5446 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/EsQueryExec.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.Rows; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -47,9 +48,9 @@ public class EsQueryExec extends LeafExec { public List output() { return output; } - + @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { Scroller scroller = new Scroller(session.client(), session.settings()); scroller.scroll(Rows.schema(output), queryContainer, index, listener); } @@ -79,4 +80,4 @@ public class EsQueryExec extends LeafExec { public String nodeString() { return nodeName() + "[" + index + "," + queryContainer + "]"; } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java index 839a2f4cfb5..d27bc97ed5d 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/LocalExec.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.session.EmptyExecutable; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.tree.Location; @@ -39,7 +39,7 @@ public class LocalExec extends LeafExec { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { executable.execute(session, listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java index 21a6c56d7cf..df351c2115c 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plan/physical/Unexecutable.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.physical; import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.planner.PlanningException; import org.elasticsearch.xpack.sql.session.Executable; -import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.session.SqlSession; import java.util.Locale; @@ -18,7 +18,7 @@ import static java.lang.String.format; // this is mainly a marker interface to validate a plan before being executed public interface Unexecutable extends Executable { - default void execute(SqlSession session, ActionListener listener) { + default void execute(SqlSession session, ActionListener listener) { throw new PlanningException(format(Locale.ROOT, "Current plan %s is not executable", this)); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java index 75659787783..07fb17e953e 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/TransportSqlAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo; import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.RowSet; +import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; import java.util.ArrayList; @@ -56,35 +57,35 @@ public class TransportSqlAction extends HandledTransportAction listener.onResponse(createResponse(true, cursor)), listener::onFailure)); + ActionListener.wrap(rowSet -> listener.onResponse(createResponse(rowSet)), listener::onFailure)); } else { planExecutor.nextPage(request.cursor(), - ActionListener.wrap(cursor -> listener.onResponse(createResponse(false, cursor)), listener::onFailure)); + ActionListener.wrap(rowSet -> listener.onResponse(createResponse(rowSet, null)), listener::onFailure)); } } - static SqlResponse createResponse(boolean includeColumnMetadata, RowSet cursor) { - List columns = null; - if (includeColumnMetadata) { - columns = new ArrayList<>(cursor.schema().types().size()); - for (Schema.Entry entry : cursor.schema()) { - columns.add(new ColumnInfo(entry.name(), entry.type().esName(), entry.type().sqlType(), entry.type().displaySize())); - } - columns = unmodifiableList(columns); + static SqlResponse createResponse(SchemaRowSet rowSet) { + List columns = new ArrayList<>(rowSet.columnCount()); + for (Schema.Entry entry : rowSet.schema()) { + columns.add(new ColumnInfo(entry.name(), entry.type().esName(), entry.type().sqlType(), entry.type().displaySize())); } + columns = unmodifiableList(columns); + return createResponse(rowSet, columns); + } + static SqlResponse createResponse(RowSet rowSet, List columns) { List> rows = new ArrayList<>(); - cursor.forEachRow(rowView -> { - List row = new ArrayList<>(rowView.rowSize()); + rowSet.forEachRow(rowView -> { + List row = new ArrayList<>(rowView.columnCount()); rowView.forEachColumn(row::add); rows.add(unmodifiableList(row)); }); return new SqlResponse( - cursor.nextPageCursor(), - cursor.size(), - cursor.rowSize(), + rowSet.nextPageCursor(), + rowSet.size(), + rowSet.columnCount(), columns, rows); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSet.java index 283bf7c9392..d6b7a02d9c4 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/AbstractRowSet.java @@ -5,25 +5,15 @@ */ package org.elasticsearch.xpack.sql.session; -import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.util.Assert; public abstract class AbstractRowSet implements RowSet { - - private final Schema schema; - private final int size; - private boolean terminated = false; - protected AbstractRowSet(Schema schema) { - this.schema = schema; - this.size = schema().names().size(); - } - @Override public Object column(int index) { Assert.isTrue(index >= 0, "Invalid index %d; needs to be positive", index); - Assert.isTrue(index < rowSize(), "Invalid index %d for row of size %d", index, rowSize()); + Assert.isTrue(index < columnCount(), "Invalid index %d for row of size %d", index, columnCount()); Assert.isTrue(hasCurrentRow(), "RowSet contains no (more) entries; use hasCurrent() to check its status"); return getColumn(index); } @@ -59,22 +49,12 @@ public abstract class AbstractRowSet implements RowSet { protected abstract void doReset(); - @Override - public int rowSize() { - return size; - } - - @Override - public Schema schema() { - return schema; - } - @Override public String toString() { StringBuilder sb = new StringBuilder(); if (hasCurrentRow()) { - for (int column = 0; column < size; column++) { + for (int column = 0; column < columnCount(); column++) { if (column > 0) { sb.append("|"); } @@ -83,7 +63,7 @@ public abstract class AbstractRowSet implements RowSet { // the value might contain multiple lines (plan execution for example) // TODO: this needs to be improved to properly scale each row across multiple lines String[] split = val.split("\\n"); - + for (int splitIndex = 0; splitIndex < split.length; splitIndex++) { if (splitIndex > 0) { sb.append("\n"); @@ -97,4 +77,4 @@ public abstract class AbstractRowSet implements RowSet { return sb.toString(); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java index fb9df0cf942..09e0d3ac2a3 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyExecutable.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.sql.session; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.sql.expression.Attribute; import java.util.List; @@ -24,7 +25,7 @@ public class EmptyExecutable implements Executable { } @Override - public void execute(SqlSession session, org.elasticsearch.action.ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { listener.onResponse(Rows.empty(output)); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java index f32d73dd63d..7e943931e91 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyRowSetCursor.java @@ -7,10 +7,11 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.xpack.sql.type.Schema; -class EmptyRowSetCursor extends AbstractRowSet { +class EmptyRowSetCursor extends AbstractRowSet implements SchemaRowSet { + private final Schema schema; EmptyRowSetCursor(Schema schema) { - super(schema); + this.schema = schema; } @Override @@ -42,4 +43,9 @@ class EmptyRowSetCursor extends AbstractRowSet { public Cursor nextPageCursor() { return Cursor.EMPTY; } + + @Override + public Schema schema() { + return schema; + } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java index d6c9652da2d..dbc16317029 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Executable.java @@ -14,5 +14,5 @@ public interface Executable { List output(); - void execute(SqlSession session, ActionListener listener); + void execute(SqlSession session, ActionListener listener); } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java index 1c31e96b74b..39987d21ac6 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/ListRowSetCursor.java @@ -9,13 +9,14 @@ import org.elasticsearch.xpack.sql.type.Schema; import java.util.List; -class ListRowSetCursor extends AbstractRowSet { +class ListRowSetCursor extends AbstractRowSet implements SchemaRowSet { + private final Schema schema; private final List> list; private int pos = 0; ListRowSetCursor(Schema schema, List> list) { - super(schema); + this.schema = schema; this.list = list; } @@ -52,4 +53,9 @@ class ListRowSetCursor extends AbstractRowSet { public Cursor nextPageCursor() { return Cursor.EMPTY; } + + @Override + public Schema schema() { + return schema; + } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java index 957ac5f2fec..ff74d28d266 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowSet.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.execution.PlanExecutor; import java.util.function.Consumer; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java index 9131af20751..c37b018d524 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/RowView.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.sql.session; -import org.elasticsearch.xpack.sql.type.Schema; - import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Objects; @@ -17,12 +15,10 @@ import java.util.function.Consumer; * Offers access to the data but it shouldn't be held since it is not a data container. */ public interface RowView extends Iterable { - - Schema schema(); - - default int rowSize() { - return schema().names().size(); - } + /** + * Number of columns in this row. + */ + int columnCount(); Object column(int index); @@ -37,7 +33,7 @@ public interface RowView extends Iterable { default void forEachColumn(Consumer action) { Objects.requireNonNull(action); - int rowSize = rowSize(); + int rowSize = columnCount(); for (int i = 0; i < rowSize; i++) { action.accept(column(i)); } @@ -47,8 +43,8 @@ public interface RowView extends Iterable { default Iterator iterator() { return new Iterator() { private int pos = 0; - private final int rowSize = rowSize(); - + private final int rowSize = columnCount(); + @Override public boolean hasNext() { return pos < rowSize; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java index 98f936922de..34395dbe837 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/Rows.java @@ -29,7 +29,7 @@ public abstract class Rows { } return new Schema(names, types); } - + public static Schema schema(String n1, DataType t1) { return new Schema(singletonList(n1), singletonList(t1)); } @@ -50,11 +50,11 @@ public abstract class Rows { return new Schema(asList(n1, n2, n3, n4, n5), asList(t1, t2, t3, t4, t5)); } - public static RowSet of(List attrs, List> values) { + public static SchemaRowSet of(List attrs, List> values) { if (values.isEmpty()) { return empty(attrs); } - + if (values.size() == 1) { return singleton(attrs, values.get(0).toArray()); } @@ -63,16 +63,16 @@ public abstract class Rows { return new ListRowSetCursor(schema, values); } - public static RowSet singleton(List attrs, Object... values) { + public static SchemaRowSet singleton(List attrs, Object... values) { Assert.isTrue(attrs.size() == values.length, "Schema %s and values %s are out of sync", attrs, values); return new SingletonRowSet(schema(attrs), values); } - public static RowSet empty(Schema schema) { + public static SchemaRowSet empty(Schema schema) { return new EmptyRowSetCursor(schema); } - public static RowSet empty(List attrs) { + public static SchemaRowSet empty(List attrs) { return new EmptyRowSetCursor(schema(attrs)); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SchemaRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SchemaRowSet.java new file mode 100644 index 00000000000..88c89b40543 --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SchemaRowSet.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.session; + +import org.elasticsearch.xpack.sql.type.Schema; + +/** + * A {@linkplain RowSet} with the {@link Schema} for the results + * attached. + */ +public interface SchemaRowSet extends RowSet { + /** + * Schema for the results. + */ + Schema schema(); + + @Override + default int columnCount() { + return schema().names().size(); + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java index c93150c6ee2..9785286e674 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonExecutable.java @@ -28,7 +28,7 @@ public class SingletonExecutable implements Executable { } @Override - public void execute(SqlSession session, ActionListener listener) { + public void execute(SqlSession session, ActionListener listener) { listener.onResponse(Rows.singleton(output, values)); } @@ -42,4 +42,4 @@ public class SingletonExecutable implements Executable { } return sb.toString(); } -} \ No newline at end of file +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java index 4236dbb1b47..c8a4e5eddfb 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SingletonRowSet.java @@ -8,12 +8,13 @@ package org.elasticsearch.xpack.sql.session; import org.elasticsearch.xpack.sql.type.Schema; //TODO is it worth keeping this when we have ListRowSet? -class SingletonRowSet extends AbstractRowSet { +class SingletonRowSet extends AbstractRowSet implements SchemaRowSet { + private final Schema schema; private final Object[] values; SingletonRowSet(Schema schema, Object[] values) { - super(schema); + this.schema = schema; this.values = values; } @@ -46,4 +47,9 @@ class SingletonRowSet extends AbstractRowSet { public Cursor nextPageCursor() { return Cursor.EMPTY; } -} \ No newline at end of file + + @Override + public Schema schema() { + return schema; + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java index faf3e3072d2..910a71c61b7 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/SqlSession.java @@ -51,8 +51,8 @@ public class SqlSession { } public SqlSession(Configuration defaults, Client client, - Catalog catalog, FunctionRegistry functionRegistry, - SqlParser parser, + Catalog catalog, FunctionRegistry functionRegistry, + SqlParser parser, Optimizer optimizer, Planner planner) { this.client = client; @@ -133,7 +133,7 @@ public class SqlSession { } } - public void sql(String sql, ActionListener listener) { + public void sql(String sql, ActionListener listener) { executable(sql).execute(this, listener); } @@ -145,7 +145,7 @@ public class SqlSession { return settings; } - public void execute(PhysicalPlan plan, ActionListener listener) { + public void execute(PhysicalPlan plan, ActionListener listener) { plan.execute(this, listener); } -} \ No newline at end of file +}