`RowSetCursor` was just like `RowSet` only it had methods that allowed
you to scroll to the next page. We now use `RowSet#nextPageCursor` to
get the next page in a way that doesn't require us to store state on
the server. So we can remove `RowSetCursor` entirely now.

Original commit: elastic/x-pack-elasticsearch@6a4a1efb20
This commit is contained in:
Nik Everett 2017-09-25 17:35:03 -04:00 committed by GitHub
parent c7c79bc1c0
commit 4da12381bf
35 changed files with 172 additions and 277 deletions

View File

@ -20,7 +20,7 @@ import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.session.SqlSettings;
@ -66,16 +66,16 @@ public class PlanExecutor {
}
}
public void sql(String sql, ActionListener<RowSetCursor> listener) {
public void sql(String sql, ActionListener<RowSet> listener) {
sql(SqlSettings.EMPTY, sql, listener);
}
public void sql(SqlSettings sqlSettings, String sql, ActionListener<RowSetCursor> listener) {
public void sql(SqlSettings sqlSettings, String sql, ActionListener<RowSet> listener) {
SqlSession session = newSession(sqlSettings);
session.executable(sql).execute(session, listener);
}
public void nextPage(Cursor cursor, ActionListener<RowSetCursor> listener) {
public void nextPage(Cursor cursor, ActionListener<RowSet> listener) {
cursor.nextPage(client, listener);
}
}

View File

@ -5,21 +5,21 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.xpack.sql.session.AbstractRowSetCursor;
import org.elasticsearch.xpack.sql.session.AbstractRowSet;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List;
import java.util.function.Supplier;
class AggsRowSetCursor extends AbstractRowSetCursor {
class AggsRowSet extends AbstractRowSet {
private int row = 0;
private final AggValues agg;
private final List<Supplier<Object>> columns;
AggsRowSetCursor(Schema schema, AggValues agg, List<Supplier<Object>> columns) {
super(schema, null);
AggsRowSet(Schema schema, AggValues agg, List<Supplier<Object>> columns) {
super(schema);
this.agg = agg;
this.columns = columns;
}

View File

@ -19,7 +19,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.Schema;
@ -103,7 +103,7 @@ public class ScrollCursor implements Cursor {
}
@Override
public void nextPage(Client client, ActionListener<RowSetCursor> listener) {
public void nextPage(Client client, ActionListener<RowSet> listener) {
// Fake the schema for now. We'll try to remove the need later.
List<String> names = new ArrayList<>(extractors.size());
List<DataType> dataTypes = new ArrayList<>(extractors.size());
@ -123,7 +123,7 @@ public class ScrollCursor implements Cursor {
client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> {
int limitHits = -1; // NOCOMMIT do a thing with this
listener.onResponse(new SearchHitRowSetCursor(schema, extractors, response.getHits().getHits(),
limitHits, response.getScrollId(), null));
limitHits, response.getScrollId()));
}, listener::onFailure));
}

View File

@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
@ -44,7 +43,7 @@ import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.sql.querydsl.container.ScriptFieldRef;
import org.elasticsearch.xpack.sql.querydsl.container.SearchHitFieldRef;
import org.elasticsearch.xpack.sql.querydsl.container.TotalCountRef;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.type.Schema;
@ -53,7 +52,6 @@ import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
// TODO: add retry/back-off
public class Scroller {
@ -76,7 +74,7 @@ public class Scroller {
this.size = size;
}
public void scroll(Schema schema, QueryContainer query, String index, ActionListener<RowSetCursor> listener) {
public void scroll(Schema schema, QueryContainer query, String index, ActionListener<RowSet> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, size);
@ -89,28 +87,27 @@ public class Scroller {
boolean isAggsOnly = query.isAggsOnly();
ScrollerActionListener l = isAggsOnly ? new AggsScrollActionListener(listener, client, timeout, schema, query) : new HandshakeScrollActionListener(listener, client, timeout, schema, query);
ScrollerActionListener l;
if (isAggsOnly) {
l = new AggsScrollActionListener(listener, client, timeout, schema, query);
} else {
l = new HandshakeScrollActionListener(listener, client, timeout, schema, query);
}
client.search(search, l);
}
static void from(ActionListener<RowSetCursor> listener, SearchHitsActionListener previous, String scrollId, List<HitExtractor> ext) {
ScrollerActionListener l = new SessionScrollActionListener(listener, previous.client, previous.keepAlive, previous.schema, ext, previous.limit, previous.docsRead);
previous.client.searchScroll(new SearchScrollRequest(scrollId).scroll(previous.keepAlive), l);
}
// dedicated scroll used for aggs-only/group-by results
static class AggsScrollActionListener extends ScrollerActionListener {
private final QueryContainer query;
AggsScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
AggsScrollActionListener(ActionListener<RowSet> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
super(listener, client, keepAlive, schema);
this.query = query;
}
@Override
protected RowSetCursor handleResponse(SearchResponse response) {
protected RowSet handleResponse(SearchResponse response) {
final List<Object[]> extractedAggs = new ArrayList<>();
AggValues aggValues = new AggValues(extractedAggs);
@ -151,7 +148,7 @@ public class Scroller {
aggValues.init(maxDepth, query.limit());
clearScroll(response.getScrollId());
return new AggsRowSetCursor(schema, aggValues, aggColumns);
return new AggsRowSet(schema, aggValues, aggColumns);
}
private Object[] extractAggValue(ColumnReference col, SearchResponse response) {
@ -209,12 +206,12 @@ public class Scroller {
}
// initial scroll used for parsing search hits (handles possible aggs)
static class HandshakeScrollActionListener extends SearchHitsActionListener {
static class HandshakeScrollActionListener extends ScrollerActionListener {
private final QueryContainer query;
HandshakeScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
super(listener, client, keepAlive, schema, query.limit(), 0);
HandshakeScrollActionListener(ActionListener<RowSet> listener, Client client, TimeValue keepAlive,
Schema schema, QueryContainer query) {
super(listener, client, keepAlive, schema);
this.query = query;
}
@ -223,8 +220,48 @@ public class Scroller {
super.onResponse(response);
}
@Override
protected List<HitExtractor> getExtractors() {
protected RowSet handleResponse(SearchResponse response) {
SearchHit[] hits = response.getHits().getHits();
List<HitExtractor> exts = getExtractors();
// there are some results
if (hits.length > 0) {
String scrollId = response.getScrollId();
// if there's an id, try to setup next scroll
if (scrollId != null) {
// is all the content already retrieved?
if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
// or maybe the limit has been reached
|| (hits.length >= query.limit() && query.limit() > -1)) {
// if so, clear the scroll
clearScroll(scrollId);
// and remove it to indicate no more data is expected
scrollId = null;
}
}
int limitHits = query.limit() > 0 && hits.length >= query.limit() ? query.limit() : -1;
return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId);
}
// no hits
else {
clearScroll(response.getScrollId());
// typically means last page but might be an aggs only query
return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts);
}
}
private static boolean needsHit(List<HitExtractor> exts) {
for (HitExtractor ext : exts) {
// Anything non-constant requires extraction
if (!(ext instanceof ConstantExtractor)) {
return true;
}
}
return false;
}
private List<HitExtractor> getExtractors() {
// create response extractors for the first time
List<ColumnReference> refs = query.columns();
@ -262,92 +299,15 @@ public class Scroller {
}
}
// listener used for streaming the rest of the results after the handshake has been used
static class SessionScrollActionListener extends SearchHitsActionListener {
private List<HitExtractor> exts;
SessionScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, List<HitExtractor> ext, int limit, int docCount) {
super(listener, client, keepAlive, schema, limit, docCount);
this.exts = ext;
}
@Override
protected List<HitExtractor> getExtractors() {
return exts;
}
}
public abstract static class SearchHitsActionListener extends ScrollerActionListener {
final int limit;
int docsRead;
SearchHitsActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, int limit, int docsRead) {
super(listener, client, keepAlive, schema);
this.limit = limit;
this.docsRead = docsRead;
}
protected RowSetCursor handleResponse(SearchResponse response) {
SearchHit[] hits = response.getHits().getHits();
List<HitExtractor> exts = getExtractors();
// there are some results
if (hits.length > 0) {
String scrollId = response.getScrollId();
Consumer<ActionListener<RowSetCursor>> next = null;
docsRead += hits.length;
// if there's an id, try to setup next scroll
if (scrollId != null) {
// is all the content already retrieved?
if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
// or maybe the limit has been reached
|| (docsRead >= limit && limit > -1)) {
// if so, clear the scroll
clearScroll(scrollId);
// and remove it to indicate no more data is expected
scrollId = null;
}
else {
next = l -> Scroller.from(l, this, response.getScrollId(), exts);
}
}
int limitHits = limit > 0 && docsRead >= limit ? limit : -1;
return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId, next);
}
// no hits
else {
clearScroll(response.getScrollId());
// typically means last page but might be an aggs only query
return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts);
}
}
private static boolean needsHit(List<HitExtractor> exts) {
for (HitExtractor ext : exts) {
// Anything non-constant requires extraction
if (!(ext instanceof ConstantExtractor)) {
return true;
}
}
return false;
}
protected abstract List<HitExtractor> getExtractors();
}
abstract static class ScrollerActionListener implements ActionListener<SearchResponse> {
final ActionListener<RowSetCursor> listener;
final ActionListener<RowSet> listener;
final Client client;
final TimeValue keepAlive;
final Schema schema;
ScrollerActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema) {
ScrollerActionListener(ActionListener<RowSet> listener, Client client, TimeValue keepAlive, Schema schema) {
this.listener = listener;
this.client = client;
@ -369,7 +329,7 @@ public class Scroller {
}
}
protected abstract RowSetCursor handleResponse(SearchResponse response);
protected abstract RowSet handleResponse(SearchResponse response);
protected final void clearScroll(String scrollId) {
if (scrollId != null) {

View File

@ -5,28 +5,23 @@
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.sql.session.AbstractRowSetCursor;
import org.elasticsearch.xpack.sql.session.AbstractRowSet;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
//
// Since the results might contain nested docs, the iteration is similar to that of Aggregation
// namely it discovers the nested docs and then, for iteration, increments the deepest level first
// and eventually carries that over to the top level
public class SearchHitRowSetCursor extends AbstractRowSetCursor {
/**
* Extracts rows from an array of {@link SearchHit}.
*/
public class SearchHitRowSetCursor extends AbstractRowSet {
private final SearchHit[] hits;
private final String scrollId;
private final List<HitExtractor> extractors;
@ -38,15 +33,19 @@ public class SearchHitRowSetCursor extends AbstractRowSetCursor {
private int row = 0;
SearchHitRowSetCursor(Schema schema, List<HitExtractor> exts) {
this(schema, exts, SearchHits.EMPTY, -1, null, null);
this(schema, exts, SearchHits.EMPTY, -1, null);
}
SearchHitRowSetCursor(Schema schema, List<HitExtractor> exts, SearchHit[] hits, int limitHits, String scrollId, Consumer<ActionListener<RowSetCursor>> nextSet) {
super(schema, nextSet);
SearchHitRowSetCursor(Schema schema, List<HitExtractor> exts, SearchHit[] hits, int limitHits, String scrollId) {
super(schema);
this.hits = hits;
this.scrollId = scrollId;
this.extractors = exts;
// Since the results might contain nested docs, the iteration is similar to that of Aggregation
// namely it discovers the nested docs and then, for iteration, increments the deepest level first
// and eventually carries that over to the top level
String innerHit = null;
for (HitExtractor ex : exts) {
innerHit = ex.innerHitName();

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -41,7 +41,7 @@ public class LocalRelation extends LogicalPlan implements Executable {
}
@Override
public void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, ActionListener<RowSet> listener) {
executable.execute(session, listener);
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.plan.logical.command;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -26,9 +26,9 @@ public abstract class Command extends LogicalPlan implements Executable {
}
@Override
public void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, ActionListener<RowSet> listener) {
listener.onResponse(execute(session));
}
protected abstract RowSetCursor execute(SqlSession session);
protected abstract RowSet execute(SqlSession session);
}

View File

@ -5,18 +5,13 @@
*/
package org.elasticsearch.xpack.sql.plan.logical.command;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.rule.RuleExecutor.Batch;
import org.elasticsearch.xpack.sql.rule.RuleExecutor.ExecutionInfo;
import org.elasticsearch.xpack.sql.rule.RuleExecutor.Transformation;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -25,6 +20,10 @@ import org.elasticsearch.xpack.sql.tree.NodeUtils;
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.elasticsearch.xpack.sql.util.Graphviz;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import static java.util.Collections.singletonList;
@ -69,7 +68,7 @@ public class Debug extends Command {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
String planString = null;
ExecutionInfo info = null;

View File

@ -11,7 +11,7 @@ import org.elasticsearch.xpack.sql.plan.QueryPlan;
import org.elasticsearch.xpack.sql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.sql.planner.Planner;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -71,7 +71,7 @@ public class Explain extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
String planString = null;
String planName = "Parsed";

View File

@ -5,21 +5,21 @@
*/
package org.elasticsearch.xpack.sql.plan.logical.command;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
@ -48,7 +48,8 @@ public class SessionReset extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
// NOCOMMIT this isn't likely to work any more. None of the session stuff is.
session.updateSettings(s -> {
Settings defaults = session.defaults().cfg();
Builder builder = Settings.builder().put(s);

View File

@ -5,18 +5,18 @@
*/
package org.elasticsearch.xpack.sql.plan.logical.command;
import java.util.List;
import java.util.Objects;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes;
import java.util.List;
import java.util.Objects;
import static java.util.Collections.singletonList;
public class SessionSet extends Command {
@ -44,7 +44,7 @@ public class SessionSet extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
session.updateSettings(s -> {
return Settings.builder().put(s).put(key, value).build();
});

View File

@ -9,7 +9,7 @@ import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.analysis.catalog.EsIndex;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -45,7 +45,7 @@ public class ShowColumns extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
List<List<?>> rows = new ArrayList<>();
EsIndex fetched;
try {

View File

@ -5,20 +5,20 @@
*/
package org.elasticsearch.xpack.sql.plan.logical.command;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.expression.function.FunctionDefinition;
import org.elasticsearch.xpack.sql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;
@ -42,7 +42,7 @@ public class ShowFunctions extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
FunctionRegistry registry = session.functionRegistry();
Collection<FunctionDefinition> functions = registry.listFunctions(pattern);

View File

@ -5,16 +5,16 @@
*/
package org.elasticsearch.xpack.sql.plan.logical.command;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes;
import java.util.List;
import static java.util.Collections.singletonList;
public class ShowSchemas extends Command {
@ -29,7 +29,7 @@ public class ShowSchemas extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
return Rows.empty(output());
}

View File

@ -5,20 +5,20 @@
*/
package org.elasticsearch.xpack.sql.plan.logical.command;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.type.DataTypes;
import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
@ -48,7 +48,7 @@ public class ShowSession extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
List<List<?>> out;
Settings s = session.settings().cfg();

View File

@ -11,7 +11,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.expression.RootFieldAttribute;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -45,7 +45,7 @@ public class ShowTables extends Command {
}
@Override
public final void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public final void execute(SqlSession session, ActionListener<RowSet> listener) {
String pattern = Strings.hasText(this.pattern) ? StringUtils.jdbcToEsPattern(this.pattern) : "*";
session.getIndices(new String[] {pattern}, IndicesOptions.lenientExpandOpen(), ActionListener.wrap(result -> {
listener.onResponse(Rows.of(output(), result.stream()
@ -55,7 +55,7 @@ public class ShowTables extends Command {
}
@Override
protected RowSetCursor execute(SqlSession session) {
protected RowSet execute(SqlSession session) {
throw new UnsupportedOperationException("No synchronous exec");
}

View File

@ -5,15 +5,15 @@
*/
package org.elasticsearch.xpack.sql.plan.physical;
import java.util.List;
import java.util.Objects;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.plan.logical.command.Command;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import java.util.List;
import java.util.Objects;
public class CommandExec extends LeafExec {
private final Command command;
@ -28,7 +28,7 @@ public class CommandExec extends LeafExec {
}
@Override
public void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, ActionListener<RowSet> listener) {
command.execute(session, listener);
}

View File

@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.execution.search.Scroller;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -49,7 +49,7 @@ public class EsQueryExec extends LeafExec {
}
@Override
public void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, ActionListener<RowSet> listener) {
Scroller scroller = new Scroller(session.client(), session.settings());
scroller.scroll(Rows.schema(output), queryContainer, index, listener);
}

View File

@ -9,7 +9,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.expression.Attribute;
import org.elasticsearch.xpack.sql.session.EmptyExecutable;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import org.elasticsearch.xpack.sql.tree.Location;
@ -39,7 +39,7 @@ public class LocalExec extends LeafExec {
}
@Override
public void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, ActionListener<RowSet> listener) {
executable.execute(session, listener);
}

View File

@ -5,20 +5,20 @@
*/
package org.elasticsearch.xpack.sql.plan.physical;
import java.util.Locale;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.planner.PlanningException;
import org.elasticsearch.xpack.sql.session.Executable;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;
import java.util.Locale;
import static java.lang.String.format;
// this is mainly a marker interface to validate a plan before being executed
public interface Unexecutable extends Executable {
default void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
default void execute(SqlSession session, ActionListener<RowSet> listener) {
throw new PlanningException(format(Locale.ROOT, "Current plan %s is not executable", this));
}
}

View File

@ -17,7 +17,7 @@ import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.plugin.SqlLicenseChecker;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse.ColumnInfo;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.RowSetCursor;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SqlSettings;
import org.elasticsearch.xpack.sql.type.Schema;
@ -64,7 +64,7 @@ public class TransportSqlAction extends HandledTransportAction<SqlRequest, SqlRe
}
}
static SqlResponse createResponse(boolean includeColumnMetadata, RowSetCursor cursor) {
static SqlResponse createResponse(boolean includeColumnMetadata, RowSet cursor) {
List<ColumnInfo> columns = null;
if (includeColumnMetadata) {
columns = new ArrayList<>(cursor.schema().types().size());

View File

@ -5,25 +5,19 @@
*/
package org.elasticsearch.xpack.sql.session;
import java.util.function.Consumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.type.Schema;
import org.elasticsearch.xpack.sql.util.Assert;
public abstract class AbstractRowSetCursor implements RowSetCursor {
public abstract class AbstractRowSet implements RowSet {
private final Schema schema;
private final int size;
private final Consumer<ActionListener<RowSetCursor>> nextSet;
private boolean terminated = false;
protected AbstractRowSetCursor(Schema schema, Consumer<ActionListener<RowSetCursor>> nextSet) {
protected AbstractRowSet(Schema schema) {
this.schema = schema;
this.size = schema().names().size();
this.nextSet = nextSet;
}
@Override
@ -65,21 +59,6 @@ public abstract class AbstractRowSetCursor implements RowSetCursor {
protected abstract void doReset();
@Override
public boolean hasNextSet() {
return nextSet != null;
}
@Override
public void nextSet(ActionListener<RowSetCursor> listener) {
if (nextSet != null) {
nextSet.accept(listener);
}
else {
listener.onResponse(null);
}
}
@Override
public int rowSize() {
return size;

View File

@ -27,7 +27,7 @@ public interface Cursor extends NamedWriteable {
/**
* Request the next page of data.
*/
void nextPage(Client client, ActionListener<RowSetCursor> listener);
void nextPage(Client client, ActionListener<RowSet> listener);
/**
* Write the {@linkplain Cursor} to a String for serialization over xcontent.
*/

View File

@ -35,7 +35,7 @@ class EmptyCursor implements Cursor {
}
@Override
public void nextPage(Client client, ActionListener<RowSetCursor> listener) {
public void nextPage(Client client, ActionListener<RowSet> listener) {
throw new IllegalArgumentException("there is no next page");
}

View File

@ -5,11 +5,11 @@
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.expression.Attribute;
import java.util.List;
import java.util.Objects;
import org.elasticsearch.xpack.sql.expression.Attribute;
public class EmptyExecutable implements Executable {
private final List<Attribute> output;
@ -24,7 +24,7 @@ public class EmptyExecutable implements Executable {
}
@Override
public void execute(SqlSession session, org.elasticsearch.action.ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, org.elasticsearch.action.ActionListener<RowSet> listener) {
listener.onResponse(Rows.empty(output));
}

View File

@ -7,10 +7,10 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema;
class EmptyRowSetCursor extends AbstractRowSetCursor {
class EmptyRowSetCursor extends AbstractRowSet {
EmptyRowSetCursor(Schema schema) {
super(schema, null);
super(schema);
}
@Override

View File

@ -14,5 +14,5 @@ public interface Executable {
List<Attribute> output();
void execute(SqlSession session, ActionListener<RowSetCursor> listener);
void execute(SqlSession session, ActionListener<RowSet> listener);
}

View File

@ -9,13 +9,13 @@ import org.elasticsearch.xpack.sql.type.Schema;
import java.util.List;
class ListRowSetCursor extends AbstractRowSetCursor {
class ListRowSetCursor extends AbstractRowSet {
private final List<List<?>> list;
private int pos = 0;
ListRowSetCursor(Schema schema, List<List<?>> list) {
super(schema, null);
super(schema);
this.list = list;
}

View File

@ -11,12 +11,8 @@ import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import java.util.function.Consumer;
/**
* Interface representing a set of rows produced by the SQL engine. Builds on top of a RowView to _prevent_
* a view object from being instantiated or returned.
* In other words to enforce immediate consumption (before moving forward).
*
* If (when) joins and such will be enabled, this interface would have to be retro-fitted
* to become even more lazy (so that things like number of entries) would not be known.
* A set of rows to be returned at one time and a way
* to get the next set of rows.
*/
public interface RowSet extends RowView {
@ -29,7 +25,7 @@ public interface RowSet extends RowView {
void reset();
/**
* Return the key used by {@link PlanExecutor#nextPage(Cursor, ActionListener)} to fetch the next page.
* The key used by {@link PlanExecutor#nextPage(Cursor, ActionListener)} to fetch the next page.
*/
Cursor nextPageCursor();

View File

@ -1,38 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.sql.SqlException;
import java.util.Objects;
import java.util.function.Consumer;
public interface RowSetCursor extends RowSet {
boolean hasNextSet();
void nextSet(ActionListener<RowSetCursor> listener);
default void forEachSet(Consumer<? super RowSet> action) {
Objects.requireNonNull(action);
action.accept(this);
if (hasNextSet()) {
nextSet(new ActionListener<RowSetCursor>() {
@Override
public void onResponse(RowSetCursor cursor) {
forEachSet(action);
}
@Override
public void onFailure(Exception ex) {
throw ex instanceof RuntimeException ? (RuntimeException) ex : new SqlException(ex);
}
});
}
}
}

View File

@ -5,14 +5,13 @@
*/
package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.elasticsearch.xpack.sql.type.Schema;
/**
* A view into a row.
* Offers access to the data but it shouldn't be held since it is not a data container.

View File

@ -50,7 +50,7 @@ public abstract class Rows {
return new Schema(asList(n1, n2, n3, n4, n5), asList(t1, t2, t3, t4, t5));
}
public static RowSetCursor of(List<Attribute> attrs, List<List<?>> values) {
public static RowSet of(List<Attribute> attrs, List<List<?>> values) {
if (values.isEmpty()) {
return empty(attrs);
}
@ -63,16 +63,16 @@ public abstract class Rows {
return new ListRowSetCursor(schema, values);
}
public static RowSetCursor singleton(List<Attribute> attrs, Object... values) {
public static RowSet singleton(List<Attribute> attrs, Object... values) {
Assert.isTrue(attrs.size() == values.length, "Schema %s and values %s are out of sync", attrs, values);
return new SingletonRowSet(schema(attrs), values);
}
public static RowSetCursor empty(Schema schema) {
public static RowSet empty(Schema schema) {
return new EmptyRowSetCursor(schema);
}
public static RowSetCursor empty(List<Attribute> attrs) {
public static RowSet empty(List<Attribute> attrs) {
return new EmptyRowSetCursor(schema(attrs));
}
}

View File

@ -28,7 +28,7 @@ public class SingletonExecutable implements Executable {
}
@Override
public void execute(SqlSession session, ActionListener<RowSetCursor> listener) {
public void execute(SqlSession session, ActionListener<RowSet> listener) {
listener.onResponse(Rows.singleton(output, values));
}

View File

@ -7,12 +7,12 @@ package org.elasticsearch.xpack.sql.session;
import org.elasticsearch.xpack.sql.type.Schema;
class SingletonRowSet extends AbstractRowSetCursor { // NOCOMMIT is it worth keeping this when we have ListRowSet?
class SingletonRowSet extends AbstractRowSet { // NOCOMMIT is it worth keeping this when we have ListRowSet?
private final Object[] values;
SingletonRowSet(Schema schema, Object[] values) {
super(schema, null);
super(schema);
this.values = values;
}

View File

@ -136,7 +136,7 @@ public class SqlSession {
}
}
public void sql(String sql, ActionListener<RowSetCursor> listener) {
public void sql(String sql, ActionListener<RowSet> listener) {
executable(sql).execute(this, listener);
}
@ -153,7 +153,7 @@ public class SqlSession {
return settings;
}
public void execute(PhysicalPlan plan, ActionListener<RowSetCursor> listener) {
public void execute(PhysicalPlan plan, ActionListener<RowSet> listener) {
plan.execute(this, listener);
}
}