mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-03 17:39:15 +00:00
parent
6de905c2e8
commit
1a2cbea747
@ -967,7 +967,7 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
|
||||
}
|
||||
}
|
||||
|
||||
static abstract class AnalyzeRule<SubPlan extends LogicalPlan> extends Rule<SubPlan, LogicalPlan> {
|
||||
abstract static class AnalyzeRule<SubPlan extends LogicalPlan> extends Rule<SubPlan, LogicalPlan> {
|
||||
|
||||
// transformUp (post-order) - that is first children and then the node
|
||||
// but with a twist; only if the tree is not resolved or analyzed
|
||||
|
@ -87,273 +87,274 @@ public class Scroller {
|
||||
ScrollerActionListener l = new SessionScrollActionListener(listener, previous.client, previous.keepAlive, previous.schema, ext, previous.limit, previous.docsRead);
|
||||
previous.client.searchScroll(new SearchScrollRequest(scrollId).scroll(previous.keepAlive), l);
|
||||
}
|
||||
}
|
||||
|
||||
// dedicated scroll used for aggs-only/group-by results
|
||||
class AggsScrollActionListener extends ScrollerActionListener {
|
||||
|
||||
private final QueryContainer query;
|
||||
// dedicated scroll used for aggs-only/group-by results
|
||||
static class AggsScrollActionListener extends ScrollerActionListener {
|
||||
|
||||
AggsScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
|
||||
super(listener, client, keepAlive, schema);
|
||||
this.query = query;
|
||||
private final QueryContainer query;
|
||||
|
||||
AggsScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
|
||||
super(listener, client, keepAlive, schema);
|
||||
this.query = query;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RowSetCursor handleResponse(SearchResponse response) {
|
||||
Aggregations aggs = response.getAggregations();
|
||||
|
||||
List<Object[]> columns = new ArrayList<>();
|
||||
|
||||
// this method assumes the nested aggregation are all part of the same tree (the SQL group-by)
|
||||
int maxDepth = -1;
|
||||
|
||||
for (Reference ref : query.refs()) {
|
||||
Object[] arr = null;
|
||||
|
||||
ColumnProcessor processor = null;
|
||||
|
||||
if (ref instanceof ProcessingRef) {
|
||||
ProcessingRef pRef = (ProcessingRef) ref;
|
||||
processor = pRef.processor();
|
||||
ref = pRef.ref();
|
||||
}
|
||||
|
||||
if (ref == TotalCountRef.INSTANCE) {
|
||||
arr = new Object[] { processIfNeeded(processor, Long.valueOf(response.getHits().getTotalHits())) };
|
||||
columns.add(arr);
|
||||
}
|
||||
else if (ref instanceof AggRef) {
|
||||
// workaround for elastic/elasticsearch/issues/23056
|
||||
String path = ((AggRef) ref).path();
|
||||
boolean formattedKey = AggPath.isBucketValueFormatted(path);
|
||||
if (formattedKey) {
|
||||
path = AggPath.bucketValueWithoutFormat(path);
|
||||
}
|
||||
Object value = getAggProperty(aggs, path);
|
||||
|
||||
// // FIXME: this can be tabular in nature
|
||||
// if (ref instanceof MappedAggRef) {
|
||||
// Map<String, Object> map = (Map<String, Object>) value;
|
||||
// Object extractedValue = map.get(((MappedAggRef) ref).fieldName());
|
||||
// }
|
||||
|
||||
if (formattedKey) {
|
||||
List<? extends Bucket> buckets = ((MultiBucketsAggregation) value).getBuckets();
|
||||
arr = new Object[buckets.size()];
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
arr[i] = buckets.get(i).getKeyAsString();
|
||||
}
|
||||
}
|
||||
else {
|
||||
arr = value instanceof Object[] ? (Object[]) value : new Object[] { value };
|
||||
}
|
||||
|
||||
// process if needed
|
||||
for (int i = 0; i < arr.length; i++) {
|
||||
arr[i] = processIfNeeded(processor, arr[i]);
|
||||
}
|
||||
columns.add(arr);
|
||||
}
|
||||
// aggs without any grouping
|
||||
else {
|
||||
throw new SqlIllegalArgumentException("Unexpected non-agg/grouped column specified; %s", ref.getClass());
|
||||
}
|
||||
|
||||
if (ref.depth() > maxDepth) {
|
||||
maxDepth = ref.depth();
|
||||
}
|
||||
}
|
||||
|
||||
clearScroll(response.getScrollId());
|
||||
return new AggsRowSetCursor(schema, columns, maxDepth, query.limit());
|
||||
}
|
||||
|
||||
private static Object getAggProperty(Aggregations aggs, String path) {
|
||||
List<String> list = AggregationPath.parse(path).getPathElementsAsStringList();
|
||||
String aggName = list.get(0);
|
||||
InternalAggregation agg = aggs.get(aggName);
|
||||
if (agg == null) {
|
||||
throw new ExecutionException("Cannot find an aggregation named %s", aggName);
|
||||
}
|
||||
return agg.getProperty(list.subList(1, list.size()));
|
||||
}
|
||||
|
||||
private Object processIfNeeded(ColumnProcessor processor, Object value) {
|
||||
return processor != null ? processor.apply(value) : value;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RowSetCursor handleResponse(SearchResponse response) {
|
||||
Aggregations aggs = response.getAggregations();
|
||||
// initial scroll used for parsing search hits (handles possible aggs)
|
||||
class HandshakeScrollActionListener extends SearchHitsActionListener {
|
||||
|
||||
List<Object[]> columns = new ArrayList<>();
|
||||
private final QueryContainer query;
|
||||
|
||||
// this method assumes the nested aggregation are all part of the same tree (the SQL group-by)
|
||||
int maxDepth = -1;
|
||||
HandshakeScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
|
||||
super(listener, client, keepAlive, schema, query.limit(), 0);
|
||||
this.query = query;
|
||||
}
|
||||
|
||||
for (Reference ref : query.refs()) {
|
||||
Object[] arr = null;
|
||||
@Override
|
||||
public void onResponse(SearchResponse response) {
|
||||
super.onResponse(response);
|
||||
}
|
||||
|
||||
ColumnProcessor processor = null;
|
||||
@Override
|
||||
protected List<HitExtractor> getExtractors() {
|
||||
// create response extractors for the first time
|
||||
List<Reference> refs = query.refs();
|
||||
|
||||
List<HitExtractor> exts = new ArrayList<>(refs.size());
|
||||
|
||||
for (Reference ref : refs) {
|
||||
exts.add(createExtractor(ref));
|
||||
}
|
||||
return exts;
|
||||
}
|
||||
|
||||
private HitExtractor createExtractor(Reference ref) {
|
||||
if (ref instanceof SearchHitFieldRef) {
|
||||
SearchHitFieldRef f = (SearchHitFieldRef) ref;
|
||||
return f.useDocValue() ? new DocValueExtractor(f.name()) : new SourceExtractor(f.name());
|
||||
}
|
||||
|
||||
if (ref instanceof NestedFieldRef) {
|
||||
NestedFieldRef f = (NestedFieldRef) ref;
|
||||
return new InnerHitExtractor(f.parent(), f.name(), f.useDocValue());
|
||||
}
|
||||
|
||||
if (ref instanceof ScriptFieldRef) {
|
||||
ScriptFieldRef f = (ScriptFieldRef) ref;
|
||||
return new DocValueExtractor(f.name());
|
||||
}
|
||||
|
||||
if (ref instanceof ProcessingRef) {
|
||||
ProcessingRef pRef = (ProcessingRef) ref;
|
||||
processor = pRef.processor();
|
||||
ref = pRef.ref();
|
||||
return new ProcessingHitExtractor(createExtractor(pRef.ref()), pRef.processor());
|
||||
}
|
||||
|
||||
if (ref == TotalCountRef.INSTANCE) {
|
||||
arr = new Object[] { processIfNeeded(processor, Long.valueOf(response.getHits().getTotalHits())) };
|
||||
columns.add(arr);
|
||||
}
|
||||
else if (ref instanceof AggRef) {
|
||||
// workaround for elastic/elasticsearch/issues/23056
|
||||
String path = ((AggRef) ref).path();
|
||||
boolean formattedKey = AggPath.isBucketValueFormatted(path);
|
||||
if (formattedKey) {
|
||||
path = AggPath.bucketValueWithoutFormat(path);
|
||||
}
|
||||
Object value = getAggProperty(aggs, path);
|
||||
|
||||
// // FIXME: this can be tabular in nature
|
||||
// if (ref instanceof MappedAggRef) {
|
||||
// Map<String, Object> map = (Map<String, Object>) value;
|
||||
// Object extractedValue = map.get(((MappedAggRef) ref).fieldName());
|
||||
// }
|
||||
|
||||
if (formattedKey) {
|
||||
List<? extends Bucket> buckets = ((MultiBucketsAggregation) value).getBuckets();
|
||||
arr = new Object[buckets.size()];
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
arr[i] = buckets.get(i).getKeyAsString();
|
||||
throw new SqlIllegalArgumentException("Unexpected ValueReference %s", ref.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
// listener used for streaming the rest of the results after the handshake has been used
|
||||
static class SessionScrollActionListener extends SearchHitsActionListener {
|
||||
|
||||
private List<HitExtractor> exts;
|
||||
|
||||
SessionScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, List<HitExtractor> ext, int limit, int docCount) {
|
||||
super(listener, client, keepAlive, schema, limit, docCount);
|
||||
this.exts = ext;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<HitExtractor> getExtractors() {
|
||||
return exts;
|
||||
}
|
||||
}
|
||||
|
||||
abstract static class SearchHitsActionListener extends ScrollerActionListener {
|
||||
|
||||
final int limit;
|
||||
int docsRead;
|
||||
|
||||
SearchHitsActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, int limit, int docsRead) {
|
||||
super(listener, client, keepAlive, schema);
|
||||
this.limit = limit;
|
||||
this.docsRead = docsRead;
|
||||
}
|
||||
|
||||
protected RowSetCursor handleResponse(SearchResponse response) {
|
||||
SearchHit[] hits = response.getHits().getHits();
|
||||
List<HitExtractor> exts = getExtractors();
|
||||
|
||||
// there are some results
|
||||
if (hits.length > 0) {
|
||||
String scrollId = response.getScrollId();
|
||||
Consumer<ActionListener<RowSetCursor>> next = null;
|
||||
|
||||
docsRead += hits.length;
|
||||
|
||||
// if there's an id, try to setup next scroll
|
||||
if (scrollId != null) {
|
||||
// is all the content already retrieved?
|
||||
if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
|
||||
// or maybe the limit has been reached
|
||||
|| docsRead >= limit) {
|
||||
// if so, clear the scroll
|
||||
clearScroll(scrollId);
|
||||
// and remove it to indicate no more data is expected
|
||||
scrollId = null;
|
||||
}
|
||||
else {
|
||||
next = l -> Scroller.from(l, this, response.getScrollId(), exts);
|
||||
}
|
||||
}
|
||||
else {
|
||||
arr = value instanceof Object[] ? (Object[]) value : new Object[] { value };
|
||||
}
|
||||
|
||||
// process if needed
|
||||
for (int i = 0; i < arr.length; i++) {
|
||||
arr[i] = processIfNeeded(processor, arr[i]);
|
||||
}
|
||||
columns.add(arr);
|
||||
int limitHits = limit > 0 && docsRead >= limit ? limit : -1;
|
||||
return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId, next);
|
||||
}
|
||||
// aggs without any grouping
|
||||
// no hits
|
||||
else {
|
||||
throw new SqlIllegalArgumentException("Unexpected non-agg/grouped column specified; %s", ref.getClass());
|
||||
}
|
||||
|
||||
if (ref.depth() > maxDepth) {
|
||||
maxDepth = ref.depth();
|
||||
clearScroll(response.getScrollId());
|
||||
// typically means last page but might be an aggs only query
|
||||
return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts);
|
||||
}
|
||||
}
|
||||
|
||||
clearScroll(response.getScrollId());
|
||||
return new AggsRowSetCursor(schema, columns, maxDepth, query.limit());
|
||||
}
|
||||
|
||||
private static Object getAggProperty(Aggregations aggs, String path) {
|
||||
List<String> list = AggregationPath.parse(path).getPathElementsAsStringList();
|
||||
String aggName = list.get(0);
|
||||
InternalAggregation agg = aggs.get(aggName);
|
||||
if (agg == null) {
|
||||
throw new ExecutionException("Cannot find an aggregation named %s", aggName);
|
||||
}
|
||||
return agg.getProperty(list.subList(1, list.size()));
|
||||
}
|
||||
|
||||
private Object processIfNeeded(ColumnProcessor processor, Object value) {
|
||||
return processor != null ? processor.apply(value) : value;
|
||||
}
|
||||
}
|
||||
|
||||
// initial scroll used for parsing search hits (handles possible aggs)
|
||||
class HandshakeScrollActionListener extends SearchHitsActionListener {
|
||||
|
||||
private final QueryContainer query;
|
||||
|
||||
HandshakeScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, QueryContainer query) {
|
||||
super(listener, client, keepAlive, schema, query.limit(), 0);
|
||||
this.query = query;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(SearchResponse response) {
|
||||
super.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<HitExtractor> getExtractors() {
|
||||
// create response extractors for the first time
|
||||
List<Reference> refs = query.refs();
|
||||
|
||||
List<HitExtractor> exts = new ArrayList<>(refs.size());
|
||||
|
||||
for (Reference ref : refs) {
|
||||
exts.add(createExtractor(ref));
|
||||
}
|
||||
return exts;
|
||||
}
|
||||
|
||||
private HitExtractor createExtractor(Reference ref) {
|
||||
if (ref instanceof SearchHitFieldRef) {
|
||||
SearchHitFieldRef f = (SearchHitFieldRef) ref;
|
||||
return f.useDocValue() ? new DocValueExtractor(f.name()) : new SourceExtractor(f.name());
|
||||
private static boolean needsHit(List<HitExtractor> exts) {
|
||||
for (HitExtractor ext : exts) {
|
||||
if (ext instanceof DocValueExtractor || ext instanceof ProcessingHitExtractor) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ref instanceof NestedFieldRef) {
|
||||
NestedFieldRef f = (NestedFieldRef) ref;
|
||||
return new InnerHitExtractor(f.parent(), f.name(), f.useDocValue());
|
||||
protected abstract List<HitExtractor> getExtractors();
|
||||
}
|
||||
|
||||
abstract static class ScrollerActionListener implements ActionListener<SearchResponse> {
|
||||
|
||||
final ActionListener<RowSetCursor> listener;
|
||||
|
||||
final Client client;
|
||||
final TimeValue keepAlive;
|
||||
final Schema schema;
|
||||
|
||||
ScrollerActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema) {
|
||||
this.listener = listener;
|
||||
|
||||
this.client = client;
|
||||
this.keepAlive = keepAlive;
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
if (ref instanceof ScriptFieldRef) {
|
||||
ScriptFieldRef f = (ScriptFieldRef) ref;
|
||||
return new DocValueExtractor(f.name());
|
||||
// TODO: need to handle rejections plus check failures (shard size, etc...)
|
||||
@Override
|
||||
public void onResponse(final SearchResponse response) {
|
||||
try {
|
||||
ShardSearchFailure[] failure = response.getShardFailures();
|
||||
if (!ObjectUtils.isEmpty(failure)) {
|
||||
onFailure(new ExecutionException(failure[0].reason(), failure[0].getCause()));
|
||||
}
|
||||
listener.onResponse(handleResponse(response));
|
||||
} catch (Exception ex) {
|
||||
onFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
if (ref instanceof ProcessingRef) {
|
||||
ProcessingRef pRef = (ProcessingRef) ref;
|
||||
return new ProcessingHitExtractor(createExtractor(pRef.ref()), pRef.processor());
|
||||
}
|
||||
protected abstract RowSetCursor handleResponse(SearchResponse response);
|
||||
|
||||
throw new SqlIllegalArgumentException("Unexpected ValueReference %s", ref.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
// listener used for streaming the rest of the results after the handshake has been used
|
||||
class SessionScrollActionListener extends SearchHitsActionListener {
|
||||
|
||||
private List<HitExtractor> exts;
|
||||
|
||||
SessionScrollActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, List<HitExtractor> ext, int limit, int docCount) {
|
||||
super(listener, client, keepAlive, schema, limit, docCount);
|
||||
this.exts = ext;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<HitExtractor> getExtractors() {
|
||||
return exts;
|
||||
}
|
||||
}
|
||||
|
||||
abstract class SearchHitsActionListener extends ScrollerActionListener {
|
||||
|
||||
final int limit;
|
||||
int docsRead;
|
||||
|
||||
SearchHitsActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema, int limit, int docsRead) {
|
||||
super(listener, client, keepAlive, schema);
|
||||
this.limit = limit;
|
||||
this.docsRead = docsRead;
|
||||
}
|
||||
|
||||
protected RowSetCursor handleResponse(SearchResponse response) {
|
||||
SearchHit[] hits = response.getHits().getHits();
|
||||
List<HitExtractor> exts = getExtractors();
|
||||
|
||||
// there are some results
|
||||
if (hits.length > 0) {
|
||||
String scrollId = response.getScrollId();
|
||||
Consumer<ActionListener<RowSetCursor>> next = null;
|
||||
|
||||
docsRead += hits.length;
|
||||
|
||||
// if there's an id, try to setup next scroll
|
||||
protected final void clearScroll(String scrollId) {
|
||||
if (scrollId != null) {
|
||||
// is all the content already retrieved?
|
||||
if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
|
||||
// or maybe the limit has been reached
|
||||
|| docsRead >= limit) {
|
||||
// if so, clear the scroll
|
||||
clearScroll(scrollId);
|
||||
// and remove it to indicate no more data is expected
|
||||
scrollId = null;
|
||||
}
|
||||
else {
|
||||
next = l -> Scroller.from(l, this, response.getScrollId(), exts);
|
||||
}
|
||||
}
|
||||
int limitHits = limit > 0 && docsRead >= limit ? limit : -1;
|
||||
return new SearchHitRowSetCursor(schema, exts, hits, limitHits, scrollId, next);
|
||||
}
|
||||
// no hits
|
||||
else {
|
||||
clearScroll(response.getScrollId());
|
||||
// typically means last page but might be an aggs only query
|
||||
return needsHit(exts) ? Rows.empty(schema) : new SearchHitRowSetCursor(schema, exts);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean needsHit(List<HitExtractor> exts) {
|
||||
for (HitExtractor ext : exts) {
|
||||
if (ext instanceof DocValueExtractor || ext instanceof ProcessingHitExtractor) {
|
||||
return true;
|
||||
// fire and forget
|
||||
client.prepareClearScroll().addScrollId(scrollId).execute();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected abstract List<HitExtractor> getExtractors();
|
||||
@Override
|
||||
public final void onFailure(Exception ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract class ScrollerActionListener implements ActionListener<SearchResponse> {
|
||||
|
||||
final ActionListener<RowSetCursor> listener;
|
||||
|
||||
final Client client;
|
||||
final TimeValue keepAlive;
|
||||
final Schema schema;
|
||||
|
||||
ScrollerActionListener(ActionListener<RowSetCursor> listener, Client client, TimeValue keepAlive, Schema schema) {
|
||||
this.listener = listener;
|
||||
|
||||
this.client = client;
|
||||
this.keepAlive = keepAlive;
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
// TODO: need to handle rejections plus check failures (shard size, etc...)
|
||||
@Override
|
||||
public void onResponse(final SearchResponse response) {
|
||||
try {
|
||||
ShardSearchFailure[] failure = response.getShardFailures();
|
||||
if (!ObjectUtils.isEmpty(failure)) {
|
||||
onFailure(new ExecutionException(failure[0].reason(), failure[0].getCause()));
|
||||
}
|
||||
listener.onResponse(handleResponse(response));
|
||||
} catch (Exception ex) {
|
||||
onFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract RowSetCursor handleResponse(SearchResponse response);
|
||||
|
||||
protected final void clearScroll(String scrollId) {
|
||||
if (scrollId != null) {
|
||||
// fire and forget
|
||||
client.prepareClearScroll().addScrollId(scrollId).execute();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onFailure(Exception ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ import org.elasticsearch.xpack.sql.type.DataType;
|
||||
|
||||
abstract class UnresolvedNamedExpression extends NamedExpression implements Unresolvable {
|
||||
|
||||
public UnresolvedNamedExpression(Location location, List<Expression> children) {
|
||||
UnresolvedNamedExpression(Location location, List<Expression> children) {
|
||||
super(location, "<unresolved>", children, ExpressionIdGenerator.EMPTY);
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ public enum FunctionType {
|
||||
|
||||
private final Class<? extends Function> baseClass;
|
||||
|
||||
private FunctionType(Class<? extends Function> base) {
|
||||
FunctionType(Class<? extends Function> base) {
|
||||
this.baseClass = base;
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ import org.elasticsearch.xpack.sql.type.DataTypes;
|
||||
|
||||
public class FullTextPredicate extends Expression {
|
||||
|
||||
public static enum Operator {
|
||||
public enum Operator {
|
||||
AND,
|
||||
OR;
|
||||
|
||||
|
@ -947,7 +947,7 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
|
||||
}
|
||||
|
||||
|
||||
static abstract class OptimizerRule<SubPlan extends LogicalPlan> extends Rule<SubPlan, LogicalPlan> {
|
||||
abstract static class OptimizerRule<SubPlan extends LogicalPlan> extends Rule<SubPlan, LogicalPlan> {
|
||||
|
||||
private final boolean transformDown;
|
||||
|
||||
@ -969,7 +969,7 @@ public class Optimizer extends RuleExecutor<LogicalPlan> {
|
||||
protected abstract LogicalPlan rule(SubPlan plan);
|
||||
}
|
||||
|
||||
static abstract class OptimizerExpressionUpRule extends Rule<LogicalPlan, LogicalPlan> {
|
||||
abstract static class OptimizerExpressionUpRule extends Rule<LogicalPlan, LogicalPlan> {
|
||||
|
||||
private final boolean transformDown;
|
||||
|
||||
|
@ -61,8 +61,6 @@ import static org.elasticsearch.xpack.sql.planner.QueryTranslator.toQuery;
|
||||
import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
|
||||
|
||||
class QueryFolder extends RuleExecutor<PhysicalPlan> {
|
||||
|
||||
|
||||
PhysicalPlan fold(PhysicalPlan plan) {
|
||||
return execute(plan);
|
||||
}
|
||||
@ -501,16 +499,16 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
|
||||
return exec.with(qContainer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rule for folding physical plans together
|
||||
abstract class FoldingRule<SubPlan extends PhysicalPlan> extends Rule<SubPlan, PhysicalPlan> {
|
||||
// rule for folding physical plans together
|
||||
abstract static class FoldingRule<SubPlan extends PhysicalPlan> extends Rule<SubPlan, PhysicalPlan> {
|
||||
|
||||
@Override
|
||||
public final PhysicalPlan apply(PhysicalPlan plan) {
|
||||
return plan.transformUp(this::rule, typeToken());
|
||||
@Override
|
||||
public final PhysicalPlan apply(PhysicalPlan plan) {
|
||||
return plan.transformUp(this::rule, typeToken());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected abstract PhysicalPlan rule(SubPlan plan);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected abstract PhysicalPlan rule(SubPlan plan);
|
||||
}
|
@ -697,7 +697,7 @@ abstract class QueryTranslator {
|
||||
//
|
||||
// Agg translators
|
||||
//
|
||||
|
||||
|
||||
static class DistinctCounts extends SingleValueAggTranslator<Count> {
|
||||
|
||||
@Override
|
||||
@ -708,7 +708,7 @@ abstract class QueryTranslator {
|
||||
return new CardinalityAgg(id, path, field(c));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class Sums extends SingleValueAggTranslator<Sum> {
|
||||
|
||||
@Override
|
||||
@ -716,7 +716,7 @@ abstract class QueryTranslator {
|
||||
return new SumAgg(id, path, field(s));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class Avgs extends SingleValueAggTranslator<Avg> {
|
||||
|
||||
@Override
|
||||
@ -724,7 +724,7 @@ abstract class QueryTranslator {
|
||||
return new AvgAgg(id, path, field(a));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class Maxes extends SingleValueAggTranslator<Max> {
|
||||
|
||||
@Override
|
||||
@ -732,7 +732,7 @@ abstract class QueryTranslator {
|
||||
return new MaxAgg(id, path, field(m));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class Mins extends SingleValueAggTranslator<Min> {
|
||||
|
||||
@Override
|
||||
@ -740,7 +740,7 @@ abstract class QueryTranslator {
|
||||
return new MinAgg(id, path, field(m));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class StatsAggs extends CompoundAggTranslator<Stats> {
|
||||
|
||||
@Override
|
||||
@ -748,7 +748,7 @@ abstract class QueryTranslator {
|
||||
return new StatsAgg(id, path, field(s));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class ExtendedStatsAggs extends CompoundAggTranslator<ExtendedStats> {
|
||||
|
||||
@Override
|
||||
@ -756,7 +756,7 @@ abstract class QueryTranslator {
|
||||
return new ExtendedStatsAgg(id, path, field(e));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class MatrixStatsAggs extends CompoundAggTranslator<MatrixStats> {
|
||||
|
||||
@Override
|
||||
@ -764,7 +764,7 @@ abstract class QueryTranslator {
|
||||
return new MatrixStatsAgg(id, path, singletonList(field(m)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class DateTimes extends SingleValueAggTranslator<Min> {
|
||||
|
||||
@Override
|
||||
@ -772,8 +772,8 @@ abstract class QueryTranslator {
|
||||
return new MinAgg(id, path, field(m));
|
||||
}
|
||||
}
|
||||
|
||||
static abstract class AggTranslator<F extends Function> {
|
||||
|
||||
abstract static class AggTranslator<F extends Function> {
|
||||
|
||||
private final Class<F> typeToken = ReflectionUtils.detectSuperTypeForRuleLike(getClass());
|
||||
|
||||
@ -784,8 +784,8 @@ abstract class QueryTranslator {
|
||||
|
||||
protected abstract LeafAgg asAgg(String id, String parent, F f);
|
||||
}
|
||||
|
||||
static abstract class SingleValueAggTranslator<F extends Function> extends AggTranslator<F> {
|
||||
|
||||
abstract static class SingleValueAggTranslator<F extends Function> extends AggTranslator<F> {
|
||||
|
||||
@Override
|
||||
protected final LeafAgg asAgg(String id, String parent, F function) {
|
||||
@ -795,8 +795,8 @@ abstract class QueryTranslator {
|
||||
|
||||
protected abstract LeafAgg toAgg(String id, String path, F f);
|
||||
}
|
||||
|
||||
static abstract class CompoundAggTranslator<C extends CompoundAggregate> extends AggTranslator<C> {
|
||||
|
||||
abstract static class CompoundAggTranslator<C extends CompoundAggregate> extends AggTranslator<C> {
|
||||
|
||||
@Override
|
||||
protected final LeafAgg asAgg(String id, String parent, C function) {
|
||||
@ -806,9 +806,8 @@ abstract class QueryTranslator {
|
||||
|
||||
protected abstract LeafAgg toAgg(String id, String path, C f);
|
||||
}
|
||||
|
||||
|
||||
static abstract class ExppressionTranslator<E extends Expression> {
|
||||
|
||||
abstract static class ExppressionTranslator<E extends Expression> {
|
||||
|
||||
private final Class<E> typeToken = ReflectionUtils.detectSuperTypeForRuleLike(getClass());
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user