EQL: Add infra for planning and query folding (#52065)
Actual folding not yet in place (TBD) (cherry picked from commit d52b96f273a94c90e475a5035cd57baa086fb0c0)
This commit is contained in:
parent
e098e837f7
commit
26900bfb05
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.eql.analysis;
|
||||
|
||||
import org.elasticsearch.xpack.eql.common.Failure;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.expression.NamedExpression;
|
||||
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
|
||||
|
|
|
@ -7,23 +7,14 @@ package org.elasticsearch.xpack.eql.analysis;
|
|||
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.eql.EqlClientException;
|
||||
import org.elasticsearch.xpack.ql.tree.Location;
|
||||
import org.elasticsearch.xpack.ql.util.StringUtils;
|
||||
import org.elasticsearch.xpack.eql.common.Failure;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class VerificationException extends EqlClientException {
|
||||
|
||||
protected VerificationException(Collection<Failure> sources) {
|
||||
super(asMessage(sources));
|
||||
}
|
||||
|
||||
private static String asMessage(Collection<Failure> failures) {
|
||||
return failures.stream().map(f -> {
|
||||
Location l = f.node().source().source();
|
||||
return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message();
|
||||
}).collect(Collectors.joining(StringUtils.NEW_LINE, "Found " + failures.size() + " problem(s)\n", StringUtils.EMPTY));
|
||||
super(Failure.failMessage(sources));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.eql.analysis;
|
||||
|
||||
import org.elasticsearch.xpack.eql.common.Failure;
|
||||
import org.elasticsearch.xpack.ql.capabilities.Unresolvable;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
|
||||
|
@ -22,7 +23,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.elasticsearch.xpack.eql.analysis.Failure.fail;
|
||||
import static org.elasticsearch.xpack.eql.common.Failure.fail;
|
||||
|
||||
/**
|
||||
* The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check.
|
||||
|
|
|
@ -4,29 +4,33 @@
|
|||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.eql.analysis;
|
||||
package org.elasticsearch.xpack.eql.common;
|
||||
|
||||
import org.elasticsearch.xpack.ql.tree.Location;
|
||||
import org.elasticsearch.xpack.ql.tree.Node;
|
||||
import org.elasticsearch.xpack.ql.util.StringUtils;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
|
||||
|
||||
class Failure {
|
||||
public class Failure {
|
||||
|
||||
private final Node<?> node;
|
||||
private final String message;
|
||||
|
||||
Failure(Node<?> node, String message) {
|
||||
public Failure(Node<?> node, String message) {
|
||||
this.node = node;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
Node<?> node() {
|
||||
public Node<?> node() {
|
||||
return node;
|
||||
}
|
||||
|
||||
String message() {
|
||||
public String message() {
|
||||
return message;
|
||||
}
|
||||
|
||||
|
@ -54,7 +58,16 @@ class Failure {
|
|||
return message;
|
||||
}
|
||||
|
||||
static Failure fail(Node<?> source, String message, Object... args) {
|
||||
public static Failure fail(Node<?> source, String message, Object... args) {
|
||||
return new Failure(source, format(message, args));
|
||||
}
|
||||
|
||||
public static String failMessage(Collection<Failure> failures) {
|
||||
return failures.stream().map(f -> {
|
||||
Location l = f.node().source().source();
|
||||
return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message();
|
||||
}).collect(Collectors.joining(StringUtils.NEW_LINE,
|
||||
format("Found {} problem{}\n", failures.size(), failures.size() > 1 ? "s" : StringUtils.EMPTY),
|
||||
StringUtils.EMPTY));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
|
||||
import org.elasticsearch.xpack.eql.session.EqlSession;
|
||||
import org.elasticsearch.xpack.eql.session.Results;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class EsQueryExec extends LeafExec {
|
||||
|
||||
private final String index;
|
||||
private final List<Attribute> output;
|
||||
private final QueryContainer queryContainer;
|
||||
|
||||
public EsQueryExec(Source source, String index, List<Attribute> output, QueryContainer queryContainer) {
|
||||
super(source);
|
||||
this.index = index;
|
||||
this.output = output;
|
||||
this.queryContainer = queryContainer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeInfo<EsQueryExec> info() {
|
||||
return NodeInfo.create(this, EsQueryExec::new, index, output, queryContainer);
|
||||
}
|
||||
|
||||
public EsQueryExec with(QueryContainer queryContainer) {
|
||||
return new EsQueryExec(source(), index, output, queryContainer);
|
||||
}
|
||||
|
||||
public String index() {
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return output;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(EqlSession session, ActionListener<Results> listener) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(index, queryContainer, output);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
EsQueryExec other = (EsQueryExec) obj;
|
||||
return Objects.equals(index, other.index)
|
||||
&& Objects.equals(queryContainer, other.queryContainer)
|
||||
&& Objects.equals(output, other.output);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String nodeString() {
|
||||
return nodeName() + "[" + index + "," + queryContainer + "]";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.expression.Expression;
|
||||
import org.elasticsearch.xpack.ql.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class FilterExec extends UnaryExec implements Unexecutable {
|
||||
|
||||
private final Expression condition;
|
||||
private final boolean onAggs;
|
||||
|
||||
public FilterExec(Source source, PhysicalPlan child, Expression condition) {
|
||||
this(source, child, condition, false);
|
||||
}
|
||||
|
||||
public FilterExec(Source source, PhysicalPlan child, Expression condition, boolean onAggs) {
|
||||
super(source, child);
|
||||
this.condition = condition;
|
||||
this.onAggs = onAggs;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeInfo<FilterExec> info() {
|
||||
return NodeInfo.create(this, FilterExec::new, child(), condition, onAggs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FilterExec replaceChild(PhysicalPlan newChild) {
|
||||
return new FilterExec(source(), newChild, condition, onAggs);
|
||||
}
|
||||
|
||||
public Expression condition() {
|
||||
return condition;
|
||||
}
|
||||
|
||||
public boolean onAggs() {
|
||||
return onAggs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return child().output();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(condition, onAggs, child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
FilterExec other = (FilterExec) obj;
|
||||
return onAggs == other.onAggs
|
||||
&& Objects.equals(condition, other.condition)
|
||||
&& Objects.equals(child(), other.child());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class LeafExec extends PhysicalPlan {
|
||||
|
||||
protected LeafExec(Source source) {
|
||||
super(source, Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public final LeafExec replaceChildren(List<PhysicalPlan> newChildren) {
|
||||
throw new UnsupportedOperationException("this type of node doesn't have any children to replace");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.xpack.ql.expression.Expression;
|
||||
import org.elasticsearch.xpack.ql.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class LimitExec extends UnaryExec implements Unexecutable {
|
||||
|
||||
private final Expression limit;
|
||||
|
||||
public LimitExec(Source source, PhysicalPlan child, Expression limit) {
|
||||
super(source, child);
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeInfo<LimitExec> info() {
|
||||
return NodeInfo.create(this, LimitExec::new, child(), limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LimitExec replaceChild(PhysicalPlan newChild) {
|
||||
return new LimitExec(source(), newChild, limit);
|
||||
}
|
||||
|
||||
public Expression limit() {
|
||||
return limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(limit, child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LimitExec other = (LimitExec) obj;
|
||||
return Objects.equals(limit, other.limit)
|
||||
&& Objects.equals(child(), other.child());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.xpack.eql.session.EmptyExecutable;
|
||||
import org.elasticsearch.xpack.eql.session.EqlSession;
|
||||
import org.elasticsearch.xpack.eql.session.Executable;
|
||||
import org.elasticsearch.xpack.eql.session.Results;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class LocalExec extends LeafExec {
|
||||
|
||||
private final Executable executable;
|
||||
|
||||
public LocalExec(Source source, Executable executable) {
|
||||
super(source);
|
||||
this.executable = executable;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeInfo<LocalExec> info() {
|
||||
return NodeInfo.create(this, LocalExec::new, executable);
|
||||
}
|
||||
|
||||
public Executable executable() {
|
||||
return executable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return executable.output();
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return executable instanceof EmptyExecutable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(EqlSession session, ActionListener<Results> listener) {
|
||||
executable.execute(session, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(executable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LocalExec other = (LocalExec) obj;
|
||||
return Objects.equals(executable, other.executable);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.xpack.eql.session.EqlSession;
|
||||
import org.elasticsearch.xpack.eql.session.Executable;
|
||||
import org.elasticsearch.xpack.eql.session.Results;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.ql.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
public class LocalRelation extends LogicalPlan implements Executable {
|
||||
|
||||
private final Executable executable;
|
||||
|
||||
public LocalRelation(Source source, Executable executable) {
|
||||
super(source, emptyList());
|
||||
this.executable = executable;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeInfo<LocalRelation> info() {
|
||||
return NodeInfo.create(this, LocalRelation::new, executable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogicalPlan replaceChildren(List<LogicalPlan> newChildren) {
|
||||
throw new UnsupportedOperationException("this type of node doesn't have any children to replace");
|
||||
}
|
||||
|
||||
public Executable executable() {
|
||||
return executable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean expressionsResolved() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return executable.output();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(EqlSession session, ActionListener<Results> listener) {
|
||||
executable.execute(session, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return executable.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LocalRelation other = (LocalRelation) obj;
|
||||
return Objects.equals(executable, other.executable);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.xpack.ql.expression.Order;
|
||||
import org.elasticsearch.xpack.ql.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class OrderExec extends UnaryExec implements Unexecutable {
|
||||
|
||||
private final List<Order> order;
|
||||
|
||||
public OrderExec(Source source, PhysicalPlan child, List<Order> order) {
|
||||
super(source, child);
|
||||
this.order = order;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeInfo<OrderExec> info() {
|
||||
return NodeInfo.create(this, OrderExec::new, child(), order);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected OrderExec replaceChild(PhysicalPlan newChild) {
|
||||
return new OrderExec(source(), newChild, order);
|
||||
}
|
||||
|
||||
public List<Order> order() {
|
||||
return order;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(order, child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
OrderExec other = (OrderExec) obj;
|
||||
|
||||
return Objects.equals(order, other.order)
|
||||
&& Objects.equals(child(), other.child());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class UnaryExec extends PhysicalPlan {
|
||||
|
||||
private final PhysicalPlan child;
|
||||
|
||||
protected UnaryExec(Source source, PhysicalPlan child) {
|
||||
super(source, Collections.singletonList(child));
|
||||
this.child = child;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final PhysicalPlan replaceChildren(List<PhysicalPlan> newChildren) {
|
||||
if (newChildren.size() != 1) {
|
||||
throw new IllegalArgumentException("expected [1] child but received [" + newChildren.size() + "]");
|
||||
}
|
||||
return replaceChild(newChildren.get(0));
|
||||
}
|
||||
|
||||
protected abstract UnaryExec replaceChild(PhysicalPlan newChild);
|
||||
|
||||
public PhysicalPlan child() {
|
||||
return child;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return child.output();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
UnaryExec other = (UnaryExec) obj;
|
||||
|
||||
return Objects.equals(child, other.child);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.xpack.eql.planner.PlanningException;
|
||||
import org.elasticsearch.xpack.eql.session.EqlSession;
|
||||
import org.elasticsearch.xpack.eql.session.Executable;
|
||||
import org.elasticsearch.xpack.eql.session.Results;
|
||||
|
||||
|
||||
// this is mainly a marker interface to validate a plan before being executed
|
||||
public interface Unexecutable extends Executable {
|
||||
|
||||
@Override
|
||||
default void execute(EqlSession session, ActionListener<Results> listener) {
|
||||
throw new PlanningException("Current plan {} is not executable", this);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.plan.physical;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.xpack.eql.planner.PlanningException;
|
||||
import org.elasticsearch.xpack.eql.session.EqlSession;
|
||||
import org.elasticsearch.xpack.eql.session.Results;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.ql.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.ql.tree.Source;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class UnplannedExec extends LeafExec implements Unexecutable {
|
||||
|
||||
private final LogicalPlan plan;
|
||||
|
||||
public UnplannedExec(Source source, LogicalPlan plan) {
|
||||
super(source);
|
||||
this.plan = plan;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NodeInfo<UnplannedExec> info() {
|
||||
return NodeInfo.create(this, UnplannedExec::new, plan);
|
||||
}
|
||||
|
||||
public LogicalPlan plan() {
|
||||
return plan;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return plan.output();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(EqlSession session, ActionListener<Results> listener) {
|
||||
throw new PlanningException("Current plan {} is not executable", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return plan.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
UnplannedExec other = (UnplannedExec) obj;
|
||||
return Objects.equals(plan, other.plan);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String nodeString() {
|
||||
return nodeName() + "[" + plan.nodeString() + "]";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.eql.planner;
|
||||
|
||||
import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.FilterExec;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.LimitExec;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.LocalExec;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.LocalRelation;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.OrderExec;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.UnplannedExec;
|
||||
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.Filter;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.Limit;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.OrderBy;
|
||||
import org.elasticsearch.xpack.ql.rule.Rule;
|
||||
import org.elasticsearch.xpack.ql.rule.RuleExecutor;
|
||||
import org.elasticsearch.xpack.ql.util.ReflectionUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
class Mapper extends RuleExecutor<PhysicalPlan> {
|
||||
|
||||
PhysicalPlan map(LogicalPlan plan) {
|
||||
return execute(planLater(plan));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Iterable<RuleExecutor<PhysicalPlan>.Batch> batches() {
|
||||
Batch conversion = new Batch("Mapping", new SimpleExecMapper());
|
||||
|
||||
return Arrays.asList(conversion);
|
||||
}
|
||||
|
||||
private static PhysicalPlan planLater(LogicalPlan plan) {
|
||||
return new UnplannedExec(plan.source(), plan);
|
||||
}
|
||||
|
||||
private static class SimpleExecMapper extends MapExecRule<LogicalPlan> {
|
||||
|
||||
@Override
|
||||
protected PhysicalPlan map(LogicalPlan p) {
|
||||
|
||||
if (p instanceof LocalRelation) {
|
||||
return new LocalExec(p.source(), ((LocalRelation) p).executable());
|
||||
}
|
||||
|
||||
if (p instanceof Filter) {
|
||||
Filter fl = (Filter) p;
|
||||
return new FilterExec(p.source(), map(fl.child()), fl.condition());
|
||||
}
|
||||
|
||||
if (p instanceof OrderBy) {
|
||||
OrderBy o = (OrderBy) p;
|
||||
return new OrderExec(p.source(), map(o.child()), o.order());
|
||||
}
|
||||
|
||||
if (p instanceof Limit) {
|
||||
Limit l = (Limit) p;
|
||||
return new LimitExec(p.source(), map(l.child()), l.limit());
|
||||
}
|
||||
|
||||
if (p instanceof EsRelation) {
|
||||
EsRelation c = (EsRelation) p;
|
||||
List<Attribute> output = c.output();
|
||||
QueryContainer container = new QueryContainer();
|
||||
if (c.frozen()) {
|
||||
container = container.withFrozen();
|
||||
}
|
||||
return new EsQueryExec(p.source(), c.index().name(), output, container);
|
||||
}
|
||||
|
||||
return planLater(p);
|
||||
}
|
||||
}
|
||||
|
||||
abstract static class MapExecRule<SubPlan extends LogicalPlan> extends Rule<UnplannedExec, PhysicalPlan> {
|
||||
|
||||
private final Class<SubPlan> subPlanToken = ReflectionUtils.detectSuperTypeForRuleLike(getClass());
|
||||
|
||||
@Override
|
||||
public final PhysicalPlan apply(PhysicalPlan plan) {
|
||||
return plan.transformUp(this::rule, UnplannedExec.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected final PhysicalPlan rule(UnplannedExec plan) {
|
||||
LogicalPlan subPlan = plan.plan();
|
||||
if (subPlanToken.isInstance(subPlan)) {
|
||||
return map((SubPlan) subPlan);
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
protected abstract PhysicalPlan map(SubPlan plan);
|
||||
}
|
||||
}
|
|
@ -6,12 +6,42 @@
|
|||
|
||||
package org.elasticsearch.xpack.eql.planner;
|
||||
|
||||
import org.elasticsearch.xpack.eql.common.Failure;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class Planner {
|
||||
|
||||
private final Mapper mapper = new Mapper();
|
||||
private final QueryFolder folder = new QueryFolder();
|
||||
|
||||
public PhysicalPlan plan(LogicalPlan plan) {
|
||||
throw new UnsupportedOperationException();
|
||||
return foldPlan(mapPlan(plan));
|
||||
}
|
||||
|
||||
PhysicalPlan mapPlan(LogicalPlan plan) {
|
||||
return verifyMappingPlan(mapper.map(plan));
|
||||
}
|
||||
|
||||
PhysicalPlan foldPlan(PhysicalPlan mapped) {
|
||||
return verifyExecutingPlan(folder.fold(mapped));
|
||||
}
|
||||
|
||||
PhysicalPlan verifyMappingPlan(PhysicalPlan plan) {
|
||||
List<Failure> failures = Verifier.verifyMappingPlan(plan);
|
||||
if (failures.isEmpty() == false) {
|
||||
throw new PlanningException(failures);
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
||||
PhysicalPlan verifyExecutingPlan(PhysicalPlan plan) {
|
||||
List<Failure> failures = Verifier.verifyExecutingPlan(plan);
|
||||
if (failures.isEmpty() == false) {
|
||||
throw new PlanningException(failures);
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.planner;
|
||||
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.eql.EqlClientException;
|
||||
import org.elasticsearch.xpack.eql.common.Failure;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public class PlanningException extends EqlClientException {
|
||||
|
||||
public PlanningException(String message, Object... args) {
|
||||
super(message, args);
|
||||
}
|
||||
|
||||
protected PlanningException(Collection<Failure> sources) {
|
||||
super(Failure.failMessage(sources));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
return RestStatus.BAD_REQUEST;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.eql.planner;
|
||||
|
||||
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.ql.rule.RuleExecutor;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
class QueryFolder extends RuleExecutor<PhysicalPlan> {
|
||||
|
||||
PhysicalPlan fold(PhysicalPlan plan) {
|
||||
return execute(plan);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Iterable<RuleExecutor<PhysicalPlan>.Batch> batches() {
|
||||
return emptyList();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.planner;
|
||||
|
||||
import org.elasticsearch.xpack.eql.common.Failure;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.Unexecutable;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.UnplannedExec;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.xpack.eql.common.Failure.fail;
|
||||
|
||||
abstract class Verifier {
|
||||
|
||||
static List<Failure> verifyMappingPlan(PhysicalPlan plan) {
|
||||
List<Failure> failures = new ArrayList<>();
|
||||
|
||||
plan.forEachUp(p -> {
|
||||
if (p instanceof UnplannedExec) {
|
||||
failures.add(fail(p, "Unplanned item"));
|
||||
}
|
||||
p.forEachExpressionsUp(e -> {
|
||||
if (e.childrenResolved() && !e.resolved()) {
|
||||
failures.add(fail(e, "Unresolved expression"));
|
||||
}
|
||||
});
|
||||
});
|
||||
return failures;
|
||||
}
|
||||
|
||||
static List<Failure> verifyExecutingPlan(PhysicalPlan plan) {
|
||||
List<Failure> failures = new ArrayList<>();
|
||||
|
||||
plan.forEachUp(p -> {
|
||||
if (p instanceof Unexecutable) {
|
||||
failures.add(fail(p, "Unexecutable item"));
|
||||
}
|
||||
p.forEachExpressionsUp(e -> {
|
||||
if (e.childrenResolved() && !e.resolved()) {
|
||||
failures.add(fail(e, "Unresolved expression"));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
return failures;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.querydsl.container;
|
||||
|
||||
public class QueryContainer {
|
||||
|
||||
public QueryContainer withFrozen() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.eql.session;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.xpack.ql.expression.Attribute;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class EmptyExecutable implements Executable {
|
||||
|
||||
private final List<Attribute> output;
|
||||
|
||||
public EmptyExecutable(List<Attribute> output) {
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Attribute> output() {
|
||||
return output;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(EqlSession session, ActionListener<Results> listener) {
|
||||
listener.onResponse(Results.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return output.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
EmptyExecutable other = (EmptyExecutable) obj;
|
||||
return Objects.equals(output, other.output);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return output.toString();
|
||||
}
|
||||
}
|
|
@ -7,13 +7,17 @@
|
|||
package org.elasticsearch.xpack.eql.session;
|
||||
|
||||
import org.apache.lucene.search.TotalHits;
|
||||
import org.apache.lucene.search.TotalHits.Relation;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
public class Results {
|
||||
|
||||
private final TotalHits totalHits;
|
||||
public static final Results EMPTY = new Results(new TotalHits(0, Relation.EQUAL_TO), emptyList());
|
||||
|
||||
private final TotalHits totalHits;
|
||||
private final List<Object> results;
|
||||
|
||||
public Results(TotalHits totalHits, List<Object> results) {
|
||||
|
|
|
@ -38,7 +38,7 @@ public class VerifierTests extends ESTestCase {
|
|||
private String error(IndexResolution resolution, String eql) {
|
||||
VerificationException e = expectThrows(VerificationException.class, () -> accept(resolution, eql));
|
||||
assertTrue(e.getMessage().startsWith("Found "));
|
||||
String header = "Found 1 problem(s)\nline ";
|
||||
String header = "Found 1 problem\nline ";
|
||||
return e.getMessage().substring(header.length());
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.eql.planner;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.eql.analysis.Analyzer;
|
||||
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
|
||||
import org.elasticsearch.xpack.eql.analysis.Verifier;
|
||||
import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry;
|
||||
import org.elasticsearch.xpack.eql.optimizer.Optimizer;
|
||||
import org.elasticsearch.xpack.eql.parser.EqlParser;
|
||||
import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan;
|
||||
import org.elasticsearch.xpack.ql.QlClientException;
|
||||
import org.elasticsearch.xpack.ql.index.EsIndex;
|
||||
import org.elasticsearch.xpack.ql.index.IndexResolution;
|
||||
|
||||
import static org.elasticsearch.xpack.ql.type.TypesTests.loadMapping;
|
||||
|
||||
public class QueryFolderTests extends ESTestCase {
|
||||
|
||||
private EqlParser parser = new EqlParser();
|
||||
private PreAnalyzer preAnalyzer = new PreAnalyzer();
|
||||
private Analyzer analyzer = new Analyzer(new EqlFunctionRegistry(), new Verifier());
|
||||
private Optimizer optimizer = new Optimizer();
|
||||
private Planner planner = new Planner();
|
||||
|
||||
private IndexResolution index = IndexResolution.valid(new EsIndex("test", loadMapping("mapping-default.json")));
|
||||
|
||||
|
||||
private PhysicalPlan plan(IndexResolution resolution, String eql) {
|
||||
return planner.plan(optimizer.optimize(analyzer.analyze(preAnalyzer.preAnalyze(parser.createStatement(eql), resolution))));
|
||||
}
|
||||
|
||||
private PhysicalPlan plan(String eql) {
|
||||
return plan(index, eql);
|
||||
}
|
||||
|
||||
public void testBasicPlan() throws Exception {
|
||||
expectThrows(QlClientException.class, "not yet implemented", () -> plan("process where true"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue