SQL: Make schema optional in RowSet (elastic/x-pack-elasticsearch#2834)

This prevents us from having to hack a fake schema together on the
second (and third and fourth, etc) page of results.

Original commit: elastic/x-pack-elasticsearch@7ba3119daa
This commit is contained in:
Nik Everett 2017-11-04 22:52:13 +00:00 committed by GitHub
parent 49b295296e
commit 6ce140cf0f
31 changed files with 272 additions and 200 deletions

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.planner.PlanningException; import org.elasticsearch.xpack.sql.planner.PlanningException;
import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Configuration;
@ -67,11 +68,11 @@ public class PlanExecutor {
} }
} }
public void sql(String sql, ActionListener<RowSet> listener) { public void sql(String sql, ActionListener<SchemaRowSet> listener) {
sql(Configuration.DEFAULT, sql, listener); sql(Configuration.DEFAULT, sql, listener);
} }
public void sql(Configuration sqlSettings, String sql, ActionListener<RowSet> listener) { public void sql(Configuration sqlSettings, String sql, ActionListener<SchemaRowSet> listener) {
SqlSession session = newSession(sqlSettings); SqlSession session = newSession(sqlSettings);
try { try {
PhysicalPlan executable = session.executable(sql); PhysicalPlan executable = session.executable(sql);

View File

@ -11,7 +11,6 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.AbstractRowSet;
import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -21,7 +20,7 @@ import java.util.Set;
/** /**
* Extracts rows from an array of {@link SearchHit}. * Extracts rows from an array of {@link SearchHit}.
*/ */
public class SearchHitRowSetCursor extends AbstractRowSet { abstract class AbstractSearchHitRowSet extends AbstractRowSet {
private final SearchHit[] hits; private final SearchHit[] hits;
private final String scrollId; private final String scrollId;
private final List<HitExtractor> extractors; private final List<HitExtractor> extractors;
@ -33,12 +32,7 @@ public class SearchHitRowSetCursor extends AbstractRowSet {
private final int[] indexPerLevel; private final int[] indexPerLevel;
private int row = 0; private int row = 0;
SearchHitRowSetCursor(Schema schema, List<HitExtractor> exts) { AbstractSearchHitRowSet(List<HitExtractor> exts, SearchHit[] hits, int limitHits, String scrollId) {
this(schema, exts, SearchHits.EMPTY, -1, null);
}
SearchHitRowSetCursor(Schema schema, List<HitExtractor> exts, SearchHit[] hits, int limitHits, String scrollId) {
super(schema);
this.hits = hits; this.hits = hits;
this.scrollId = scrollId; this.scrollId = scrollId;
this.extractors = exts; this.extractors = exts;

View File

@ -7,19 +7,20 @@ package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.xpack.sql.session.AbstractRowSet; import org.elasticsearch.xpack.sql.session.AbstractRowSet;
import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List; import java.util.List;
import java.util.function.Supplier; import java.util.function.Supplier;
class AggsRowSet extends AbstractRowSet { class AggsRowSet extends AbstractRowSet implements SchemaRowSet {
private final Schema schema;
private int row = 0;
private final AggValues agg; private final AggValues agg;
private final List<Supplier<Object>> columns; private final List<Supplier<Object>> columns;
private int row = 0;
AggsRowSet(Schema schema, AggValues agg, List<Supplier<Object>> columns) { AggsRowSet(Schema schema, AggValues agg, List<Supplier<Object>> columns) {
super(schema); this.schema = schema;
this.agg = agg; this.agg = agg;
this.columns = columns; this.columns = columns;
} }
@ -53,4 +54,9 @@ class AggsRowSet extends AbstractRowSet {
public Cursor nextPageCursor() { public Cursor nextPageCursor() {
return Cursor.EMPTY; return Cursor.EMPTY;
} }
@Override
public Schema schema() {
return schema;
}
} }

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List;
/**
* Initial results from a scroll search. Distinct from the following pages
* because it has a {@link Schema} available. See {@link ScrolledSearchHitRowSet}
* for the next pages.
*/
public class InitialSearchHitRowSet extends AbstractSearchHitRowSet implements SchemaRowSet {
private final Schema schema;
public InitialSearchHitRowSet(Schema schema, List<HitExtractor> exts, SearchHit[] hits, int limitHits, String scrollId) {
super(exts, hits, limitHits, scrollId);
this.schema = schema;
}
@Override
public Schema schema() {
return schema;
}
}

View File

@ -110,15 +110,6 @@ public class ScrollCursor implements Cursor {
@Override @Override
public void nextPage(Client client, ActionListener<RowSet> listener) { public void nextPage(Client client, ActionListener<RowSet> listener) {
// Fake the schema for now. We'll try to remove the need later.
List<String> names = new ArrayList<>(extractors.size());
List<DataType> dataTypes = new ArrayList<>(extractors.size());
for (int i = 0; i < extractors.size(); i++) {
names.add("dummy");
dataTypes.add(null);
}
// NOCOMMIT make schema properly nullable for the second page
Schema schema = new Schema(names, dataTypes);
// NOCOMMIT add keep alive to the settings and pass it here // NOCOMMIT add keep alive to the settings and pass it here
/* Or something. The trouble is that settings is for *starting* /* Or something. The trouble is that settings is for *starting*
* queries, but maybe we should actually have two sets of settings, * queries, but maybe we should actually have two sets of settings,
@ -128,7 +119,7 @@ public class ScrollCursor implements Cursor {
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeValueSeconds(90)); SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeValueSeconds(90));
client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> {
int limitHits = limit; int limitHits = limit;
listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(), listener.onResponse(new ScrolledSearchHitRowSet(extractors, response.getHits().getHits(),
limitHits, response.getScrollId())); limitHits, response.getScrollId()));
}, listener::onFailure)); }, listener::onFailure));
} }

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List;
/**
* "Next" page of results from a scroll search. Distinct from the first page
* because it no longer has the {@link Schema}. See {@link InitialSearchHitRowSet}
* for the initial results.
*/
public class ScrolledSearchHitRowSet extends AbstractSearchHitRowSet {
private final int columnCount;
public ScrolledSearchHitRowSet(List<HitExtractor> exts, SearchHit[] hits, int limitHits, String scrollId) {
super(exts, hits, limitHits, scrollId);
this.columnCount = exts.size();
}
@Override
public int columnCount() {
return columnCount;
}
}

View File

@ -47,6 +47,7 @@ import org.elasticsearch.xpack.sql.querydsl.container.TotalCountRef;
import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.type.Schema;
import org.elasticsearch.xpack.sql.util.StringUtils; import org.elasticsearch.xpack.sql.util.StringUtils;
@ -73,7 +74,7 @@ public class Scroller {
this.size = size; this.size = size;
} }
public void scroll(Schema schema, QueryContainer query, String index, ActionListener<RowSet> listener) { public void scroll(Schema schema, QueryContainer query, String index, ActionListener<SchemaRowSet> listener) {
// prepare the request // prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, size); SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, size);
@ -100,13 +101,13 @@ public class Scroller {
private final QueryContainer query; private final QueryContainer query;
AggsScrollActionListener(ActionListener<RowSet> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) { AggsScrollActionListener(ActionListener<SchemaRowSet> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
super(listener, client, keepAlive, schema); super(listener, client, keepAlive, schema);
this.query = query; this.query = query;
} }
@Override @Override
protected RowSet handleResponse(SearchResponse response) { protected SchemaRowSet handleResponse(SearchResponse response) {
final List<Object[]> extractedAggs = new ArrayList<>(); final List<Object[]> extractedAggs = new ArrayList<>();
AggValues aggValues = new AggValues(extractedAggs); AggValues aggValues = new AggValues(extractedAggs);
@ -215,7 +216,7 @@ public class Scroller {
static class HandshakeScrollActionListener extends ScrollerActionListener { static class HandshakeScrollActionListener extends ScrollerActionListener {
private final QueryContainer query; private final QueryContainer query;
HandshakeScrollActionListener(ActionListener<RowSet> listener, Client client, TimeValue keepAlive, HandshakeScrollActionListener(ActionListener<SchemaRowSet> listener, Client client, TimeValue keepAlive,
Schema schema, QueryContainer query) { Schema schema, QueryContainer query) {
super(listener, client, keepAlive, schema); super(listener, client, keepAlive, schema);
this.query = query; this.query = query;
@ -226,7 +227,7 @@ public class Scroller {
super.onResponse(response); super.onResponse(response);
} }
protected RowSet handleResponse(SearchResponse response) { protected SchemaRowSet handleResponse(SearchResponse response) {
SearchHit[] hits = response.getHits().getHits(); SearchHit[] hits = response.getHits().getHits();
List<HitExtractor> exts = getExtractors(); List<HitExtractor> exts = getExtractors();
@ -246,26 +247,15 @@ public class Scroller {
scrollId = null; scrollId = null;
} }
} }
return new SearchHitRowSetCursor(schema, exts, hits, query.limit(), scrollId); return new InitialSearchHitRowSet(schema, exts, hits, query.limit(), scrollId);
} }
// no hits // no hits
else { else {
clearScroll(response.getScrollId()); clearScroll(response.getScrollId());
// typically means last page but might be an aggs only query return Rows.empty(schema);
return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts);
} }
} }
private static boolean needsHit(List<HitExtractor> exts) {
for (HitExtractor ext : exts) {
// Anything non-constant requires extraction
if (!(ext instanceof ConstantExtractor)) {
return true;
}
}
return false;
}
private List<HitExtractor> getExtractors() { private List<HitExtractor> getExtractors() {
// create response extractors for the first time // create response extractors for the first time
List<ColumnReference> refs = query.columns(); List<ColumnReference> refs = query.columns();
@ -306,13 +296,13 @@ public class Scroller {
abstract static class ScrollerActionListener implements ActionListener<SearchResponse> { abstract static class ScrollerActionListener implements ActionListener<SearchResponse> {
final ActionListener<RowSet> listener; final ActionListener<SchemaRowSet> listener;
final Client client; final Client client;
final TimeValue keepAlive; final TimeValue keepAlive;
final Schema schema; final Schema schema;
ScrollerActionListener(ActionListener<RowSet> listener, Client client, TimeValue keepAlive, Schema schema) { ScrollerActionListener(ActionListener<SchemaRowSet> listener, Client client, TimeValue keepAlive, Schema schema) {
this.listener = listener; this.listener = listener;
this.client = client; this.client = client;
@ -334,7 +324,7 @@ public class Scroller {
} }
} }
protected abstract RowSet handleResponse(SearchResponse response); protected abstract SchemaRowSet handleResponse(SearchResponse response);
protected final void clearScroll(String scrollId) { protected final void clearScroll(String scrollId) {
if (scrollId != null) { if (scrollId != null) {

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.Executable; import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
@ -41,7 +41,7 @@ public class LocalRelation extends LogicalPlan implements Executable {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
executable.execute(session, listener); executable.execute(session, listener);
} }

View File

@ -14,6 +14,7 @@ import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo;
import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation; import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.Node; import org.elasticsearch.xpack.sql.tree.Node;
@ -69,7 +70,7 @@ public class Debug extends Command {
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
String planString = null; String planString = null;
ExecutionInfo info = null; ExecutionInfo info = null;

View File

@ -14,6 +14,7 @@ import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.planner.Planner; import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.type.DataTypes;
@ -72,7 +73,7 @@ public class Explain extends Command {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
String planString = null; String planString = null;
String planName = "Parsed"; String planName = "Parsed";

View File

@ -9,8 +9,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.CompoundDataType; import org.elasticsearch.xpack.sql.type.CompoundDataType;
@ -45,7 +45,7 @@ public class ShowColumns extends Command {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
session.getIndices(new String[]{index}, IndicesOptions.strictExpandOpenAndForbidClosed(), ActionListener.wrap( session.getIndices(new String[]{index}, IndicesOptions.strictExpandOpenAndForbidClosed(), ActionListener.wrap(
esIndices -> { esIndices -> {
List<List<?>> rows = new ArrayList<>(); List<List<?>> rows = new ArrayList<>();

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.sql.expression.function.FunctionDefinition;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry; import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.type.DataTypes;
@ -43,7 +44,7 @@ public class ShowFunctions extends Command {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
FunctionRegistry registry = session.functionRegistry(); FunctionRegistry registry = session.functionRegistry();
Collection<FunctionDefinition> functions = registry.listFunctions(pattern); Collection<FunctionDefinition> functions = registry.listFunctions(pattern);

View File

@ -10,6 +10,7 @@ import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.type.DataTypes;
@ -30,7 +31,7 @@ public class ShowSchemas extends Command {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
listener.onResponse(Rows.empty(output())); listener.onResponse(Rows.empty(output()));
} }

View File

@ -13,6 +13,7 @@ import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute; import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes; import org.elasticsearch.xpack.sql.type.DataTypes;
@ -45,7 +46,7 @@ public class ShowTables extends Command {
} }
@Override @Override
public final void execute(SqlSession session, ActionListener<RowSet> listener) { public final void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*"; String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*";
session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> { session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
listener.onResponse(Rows.of(output(), result.stream() listener.onResponse(Rows.of(output(), result.stream()

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.physical;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.plan.logical.command.Command; import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import java.util.List; import java.util.List;
@ -28,7 +28,7 @@ public class CommandExec extends LeafExec {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
command.execute(session, listener); command.execute(session, listener);
} }

View File

@ -11,6 +11,7 @@ import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer; import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows; import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
@ -49,7 +50,7 @@ public class EsQueryExec extends LeafExec {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
Scroller scroller = new Scroller(session.client(), session.settings()); Scroller scroller = new Scroller(session.client(), session.settings());
scroller.scroll(Rows.schema(output), queryContainer, index, listener); scroller.scroll(Rows.schema(output), queryContainer, index, listener);
} }

View File

@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.EmptyExecutable; import org.elasticsearch.xpack.sql.session.EmptyExecutable;
import org.elasticsearch.xpack.sql.session.Executable; import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location; import org.elasticsearch.xpack.sql.tree.Location;
@ -39,7 +39,7 @@ public class LocalExec extends LeafExec {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
executable.execute(session, listener); executable.execute(session, listener);
} }

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.physical;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.planner.PlanningException; import org.elasticsearch.xpack.sql.planner.PlanningException;
import org.elasticsearch.xpack.sql.session.Executable; import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession; import org.elasticsearch.xpack.sql.session.SqlSession;
import java.util.Locale; import java.util.Locale;
@ -18,7 +18,7 @@ import static java.lang.String.format;
// this is mainly a marker interface to validate a plan before being executed // this is mainly a marker interface to validate a plan before being executed
public interface Unexecutable extends Executable { public interface Unexecutable extends Executable {
default void execute(SqlSession session, ActionListener<RowSet> listener) { default void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
throw new PlanningException(format(Locale.ROOT, "Current plan %s is not executable", this)); throw new PlanningException(format(Locale.ROOT, "Current plan %s is not executable", this));
} }
} }

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Configuration; import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.type.Schema;
import java.util.ArrayList; import java.util.ArrayList;
@ -56,34 +57,34 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
Configuration cfg = new Configuration(request.timeZone(), request.fetchSize(), Configuration cfg = new Configuration(request.timeZone(), request.fetchSize(),
request.requestTimeout(), request.pageTimeout()); request.requestTimeout(), request.pageTimeout());
planExecutor.sql(cfg, request.query(), planExecutor.sql(cfg, request.query(),
ActionListener.wrap(cursor -> listener.onResponse(createResponse(true, cursor)), listener::onFailure)); ActionListener.wrap(rowSet -> listener.onResponse(createResponse(rowSet)), listener::onFailure));
} else { } else {
planExecutor.nextPage(request.cursor(), planExecutor.nextPage(request.cursor(),
ActionListener.wrap(cursor -> listener.onResponse(createResponse(false, cursor)), listener::onFailure)); ActionListener.wrap(rowSet -> listener.onResponse(createResponse(rowSet, null)), listener::onFailure));
} }
} }
static SqlResponse createResponse(boolean includeColumnMetadata, RowSet cursor) { static SqlResponse createResponse(SchemaRowSet rowSet) {
List<ColumnInfo> columns = null; List<ColumnInfo> columns = new ArrayList<>(rowSet.columnCount());
if (includeColumnMetadata) { for (Schema.Entry entry : rowSet.schema()) {
columns = new ArrayList<>(cursor.schema().types().size());
for (Schema.Entry entry : cursor.schema()) {
columns.add(new ColumnInfo(entry.name(), entry.type().esName(), entry.type().sqlType(), entry.type().displaySize())); columns.add(new ColumnInfo(entry.name(), entry.type().esName(), entry.type().sqlType(), entry.type().displaySize()));
} }
columns = unmodifiableList(columns); columns = unmodifiableList(columns);
return createResponse(rowSet, columns);
} }
static SqlResponse createResponse(RowSet rowSet, List<ColumnInfo> columns) {
List<List<Object>> rows = new ArrayList<>(); List<List<Object>> rows = new ArrayList<>();
cursor.forEachRow(rowView -> { rowSet.forEachRow(rowView -> {
List<Object> row = new ArrayList<>(rowView.rowSize()); List<Object> row = new ArrayList<>(rowView.columnCount());
rowView.forEachColumn(row::add); rowView.forEachColumn(row::add);
rows.add(unmodifiableList(row)); rows.add(unmodifiableList(row));
}); });
return new SqlResponse( return new SqlResponse(
cursor.nextPageCursor(), rowSet.nextPageCursor(),
cursor.size(), rowSet.size(),
cursor.rowSize(), rowSet.columnCount(),
columns, columns,
rows); rows);
} }

View File

@ -5,25 +5,15 @@
*/ */
package org.elasticsearch.xpack.sql.session; package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema;
import org.elasticsearch.xpack.sql.util.Assert; import org.elasticsearch.xpack.sql.util.Assert;
public abstract class AbstractRowSet implements RowSet { public abstract class AbstractRowSet implements RowSet {
private final Schema schema;
private final int size;
private boolean terminated = false; private boolean terminated = false;
protected AbstractRowSet(Schema schema) {
this.schema = schema;
this.size = schema().names().size();
}
@Override @Override
public Object column(int index) { public Object column(int index) {
Assert.isTrue(index >= 0, "Invalid index %d; needs to be positive", index); Assert.isTrue(index >= 0, "Invalid index %d; needs to be positive", index);
Assert.isTrue(index < rowSize(), "Invalid index %d for row of size %d", index, rowSize()); Assert.isTrue(index < columnCount(), "Invalid index %d for row of size %d", index, columnCount());
Assert.isTrue(hasCurrentRow(), "RowSet contains no (more) entries; use hasCurrent() to check its status"); Assert.isTrue(hasCurrentRow(), "RowSet contains no (more) entries; use hasCurrent() to check its status");
return getColumn(index); return getColumn(index);
} }
@ -59,22 +49,12 @@ public abstract class AbstractRowSet implements RowSet {
protected abstract void doReset(); protected abstract void doReset();
@Override
public int rowSize() {
return size;
}
@Override
public Schema schema() {
return schema;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
if (hasCurrentRow()) { if (hasCurrentRow()) {
for (int column = 0; column < size; column++) { for (int column = 0; column < columnCount(); column++) {
if (column > 0) { if (column > 0) {
sb.append("|"); sb.append("|");
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.sql.session; package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute; import org.elasticsearch.xpack.sql.expression.Attribute;
import java.util.List; import java.util.List;
@ -24,7 +25,7 @@ public class EmptyExecutable implements Executable {
} }
@Override @Override
public void execute(SqlSession session, org.elasticsearch.action.ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
listener.onResponse(Rows.empty(output)); listener.onResponse(Rows.empty(output));
} }

View File

@ -7,10 +7,11 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.type.Schema;
class EmptyRowSetCursor extends AbstractRowSet { class EmptyRowSetCursor extends AbstractRowSet implements SchemaRowSet {
private final Schema schema;
EmptyRowSetCursor(Schema schema) { EmptyRowSetCursor(Schema schema) {
super(schema); this.schema = schema;
} }
@Override @Override
@ -42,4 +43,9 @@ class EmptyRowSetCursor extends AbstractRowSet {
public Cursor nextPageCursor() { public Cursor nextPageCursor() {
return Cursor.EMPTY; return Cursor.EMPTY;
} }
@Override
public Schema schema() {
return schema;
}
} }

View File

@ -14,5 +14,5 @@ public interface Executable {
List<Attribute> output(); List<Attribute> output();
void execute(SqlSession session, ActionListener<RowSet> listener); void execute(SqlSession session, ActionListener<SchemaRowSet> listener);
} }

View File

@ -9,13 +9,14 @@ import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List; import java.util.List;
class ListRowSetCursor extends AbstractRowSet { class ListRowSetCursor extends AbstractRowSet implements SchemaRowSet {
private final Schema schema;
private final List<List<?>> list; private final List<List<?>> list;
private int pos = 0; private int pos = 0;
ListRowSetCursor(Schema schema, List<List<?>> list) { ListRowSetCursor(Schema schema, List<List<?>> list) {
super(schema); this.schema = schema;
this.list = list; this.list = list;
} }
@ -52,4 +53,9 @@ class ListRowSetCursor extends AbstractRowSet {
public Cursor nextPageCursor() { public Cursor nextPageCursor() {
return Cursor.EMPTY; return Cursor.EMPTY;
} }
@Override
public Schema schema() {
return schema;
}
} }

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.sql.session; package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.type.Schema;
import org.elasticsearch.xpack.sql.execution.PlanExecutor; import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import java.util.function.Consumer; import java.util.function.Consumer;

View File

@ -5,8 +5,6 @@
*/ */
package org.elasticsearch.xpack.sql.session; package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Objects; import java.util.Objects;
@ -17,12 +15,10 @@ import java.util.function.Consumer;
* Offers access to the data but it shouldn't be held since it is not a data container. * Offers access to the data but it shouldn't be held since it is not a data container.
*/ */
public interface RowView extends Iterable<Object> { public interface RowView extends Iterable<Object> {
/**
Schema schema(); * Number of columns in this row.
*/
default int rowSize() { int columnCount();
return schema().names().size();
}
Object column(int index); Object column(int index);
@ -37,7 +33,7 @@ public interface RowView extends Iterable<Object> {
default void forEachColumn(Consumer<? super Object> action) { default void forEachColumn(Consumer<? super Object> action) {
Objects.requireNonNull(action); Objects.requireNonNull(action);
int rowSize = rowSize(); int rowSize = columnCount();
for (int i = 0; i < rowSize; i++) { for (int i = 0; i < rowSize; i++) {
action.accept(column(i)); action.accept(column(i));
} }
@ -47,7 +43,7 @@ public interface RowView extends Iterable<Object> {
default Iterator<Object> iterator() { default Iterator<Object> iterator() {
return new Iterator<Object>() { return new Iterator<Object>() {
private int pos = 0; private int pos = 0;
private final int rowSize = rowSize(); private final int rowSize = columnCount();
@Override @Override
public boolean hasNext() { public boolean hasNext() {

View File

@ -50,7 +50,7 @@ public abstract class Rows {
return new Schema(asList(n1, n2, n3, n4, n5), asList(t1, t2, t3, t4, t5)); return new Schema(asList(n1, n2, n3, n4, n5), asList(t1, t2, t3, t4, t5));
} }
public static RowSet of(List<Attribute> attrs, List<List<?>> values) { public static SchemaRowSet of(List<Attribute> attrs, List<List<?>> values) {
if (values.isEmpty()) { if (values.isEmpty()) {
return empty(attrs); return empty(attrs);
} }
@ -63,16 +63,16 @@ public abstract class Rows {
return new ListRowSetCursor(schema, values); return new ListRowSetCursor(schema, values);
} }
public static RowSet singleton(List<Attribute> attrs, Object... values) { public static SchemaRowSet singleton(List<Attribute> attrs, Object... values) {
Assert.isTrue(attrs.size() == values.length, "Schema %s and values %s are out of sync", attrs, values); Assert.isTrue(attrs.size() == values.length, "Schema %s and values %s are out of sync", attrs, values);
return new SingletonRowSet(schema(attrs), values); return new SingletonRowSet(schema(attrs), values);
} }
public static RowSet empty(Schema schema) { public static SchemaRowSet empty(Schema schema) {
return new EmptyRowSetCursor(schema); return new EmptyRowSetCursor(schema);
} }
public static RowSet empty(List<Attribute> attrs) { public static SchemaRowSet empty(List<Attribute> attrs) {
return new EmptyRowSetCursor(schema(attrs)); return new EmptyRowSetCursor(schema(attrs));
} }
} }

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema;
/**
* A {@linkplain RowSet} with the {@link Schema} for the results
* attached.
*/
public interface SchemaRowSet extends RowSet {
/**
* Schema for the results.
*/
Schema schema();
@Override
default int columnCount() {
return schema().names().size();
}
}

View File

@ -28,7 +28,7 @@ public class SingletonExecutable implements Executable {
} }
@Override @Override
public void execute(SqlSession session, ActionListener<RowSet> listener) { public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
listener.onResponse(Rows.singleton(output, values)); listener.onResponse(Rows.singleton(output, values));
} }

View File

@ -8,12 +8,13 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema; import org.elasticsearch.xpack.sql.type.Schema;
//TODO is it worth keeping this when we have ListRowSet? //TODO is it worth keeping this when we have ListRowSet?
class SingletonRowSet extends AbstractRowSet { class SingletonRowSet extends AbstractRowSet implements SchemaRowSet {
private final Schema schema;
private final Object[] values; private final Object[] values;
SingletonRowSet(Schema schema, Object[] values) { SingletonRowSet(Schema schema, Object[] values) {
super(schema); this.schema = schema;
this.values = values; this.values = values;
} }
@ -46,4 +47,9 @@ class SingletonRowSet extends AbstractRowSet {
public Cursor nextPageCursor() { public Cursor nextPageCursor() {
return Cursor.EMPTY; return Cursor.EMPTY;
} }
@Override
public Schema schema() {
return schema;
}
} }

View File

@ -133,7 +133,7 @@ public class SqlSession {
} }
} }
public void sql(String sql, ActionListener<RowSet> listener) { public void sql(String sql, ActionListener<SchemaRowSet> listener) {
executable(sql).execute(this, listener); executable(sql).execute(this, listener);
} }
@ -145,7 +145,7 @@ public class SqlSession {
return settings; return settings;
} }
public void execute(PhysicalPlan plan, ActionListener<RowSet> listener) { public void execute(PhysicalPlan plan, ActionListener<SchemaRowSet> listener) {
plan.execute(this, listener); plan.execute(this, listener);
} }
} }